[jira] [Resolved] (FLINK-1946) Make yarn tests logging less verbose

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-1946.

Resolution: Fixed

This is resolved with 5b2ad7f03ef67ce529551e7b464d7db94e2a1d90. Closing this.

> Make yarn tests logging less verbose
> 
>
> Key: FLINK-1946
> URL: https://issues.apache.org/jira/browse/FLINK-1946
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Till Rohrmann
>Priority: Minor
>
> Currently, the yarn tests log on the INFO level making the test outputs 
> confusing. Furthermore some status messages are written to stdout. I think 
> these messages are not necessary to be shown to the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4837) flink-streaming-akka source connector

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-4837.
--
Resolution: Won't Fix
  Assignee: Subhobrata Dey

> flink-streaming-akka source connector
> -
>
> Key: FLINK-4837
> URL: https://issues.apache.org/jira/browse/FLINK-4837
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Subhobrata Dey
>Assignee: Subhobrata Dey
>
> Hello,
> This issue is created to propose the idea of having a flink-streaming-akka 
> source connector. 
> The source connector can be used to receive messages from an Akka feeder or 
> publisher actor & these messages can then be processed using flink streaming.
> The source connector has the following features.
> 1. It can supports several different message formats like iterable data, 
> bytes array & data with timestamp.
> 2. It can send back acknowledgements to the feeder actor.
> Thanks & regards,
> Subhobrata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4837) flink-streaming-akka source connector

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839390#comment-15839390
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4837:


Seems like this JIRA can be closed, as the connector is moved to Bahir.

> flink-streaming-akka source connector
> -
>
> Key: FLINK-4837
> URL: https://issues.apache.org/jira/browse/FLINK-4837
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Subhobrata Dey
>
> Hello,
> This issue is created to propose the idea of having a flink-streaming-akka 
> source connector. 
> The source connector can be used to receive messages from an Akka feeder or 
> publisher actor & these messages can then be processed using flink streaming.
> The source connector has the following features.
> 1. It can supports several different message formats like iterable data, 
> bytes array & data with timestamp.
> 2. It can send back acknowledgements to the feeder actor.
> Thanks & regards,
> Subhobrata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5650) flink-python unit test costs more than half hour

2017-01-25 Thread shijinkui (JIRA)
shijinkui created FLINK-5650:


 Summary: flink-python unit test costs more than half hour
 Key: FLINK-5650
 URL: https://issues.apache.org/jira/browse/FLINK-5650
 Project: Flink
  Issue Type: Bug
  Components: Python API
Reporter: shijinkui


When execute `mvn clean test` in flink-python, it will wait more than half hour 
after the console output below:
---
 T E S T S
---
Running org.apache.flink.python.api.PythonPlanBinderTest
log4j:WARN No appenders could be found for logger 
(org.apache.flink.python.api.PythonPlanBinderTest).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.



The stack below:
"main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
[0x79fd8000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
at 
org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
at 
org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
at 
org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
at 
org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
at 
org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
at 
org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4463) FLIP-3: Restructure Documentation

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839354#comment-15839354
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4463 at 1/26/17 7:12 AM:
-

With the recent efforts in adding documentation for the 1.2 release, I think 
some of the subtasks here are actually already completed by other duplicate 
JIRAs.


was (Author: tzulitai):
I think some of the subtasks here are actually already completed by other 
duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 
release.

> FLIP-3: Restructure Documentation
> -
>
> Key: FLINK-4463
> URL: https://issues.apache.org/jira/browse/FLINK-4463
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>
> Super issue to track progress for 
> [FLIP-3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-3+-+Organization+of+Documentation].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4463) FLIP-3: Restructure Documentation

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839354#comment-15839354
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4463:


I think some of the subtasks here are actually already completed by other 
duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 
release.

> FLIP-3: Restructure Documentation
> -
>
> Key: FLINK-4463
> URL: https://issues.apache.org/jira/browse/FLINK-4463
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>
> Super issue to track progress for 
> [FLIP-3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-3+-+Organization+of+Documentation].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-4473) Add docs about end-to-end exactly once

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4473:
---
Comment: was deleted

(was: I think some of the subtasks here are actually already completed by other 
duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 
release.)

> Add docs about end-to-end exactly once
> --
>
> Key: FLINK-4473
> URL: https://issues.apache.org/jira/browse/FLINK-4473
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Ufuk Celebi
>
> Add page about end-to-end-exactly once processing semantics in relation to 
> connectors and processing semantics in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4473) Add docs about end-to-end exactly once

2017-01-25 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839352#comment-15839352
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4473:


I think some of the subtasks here are actually already completed by other 
duplicate JIRAs, with the recent efforts in adding documentation for the 1.2 
release.

> Add docs about end-to-end exactly once
> --
>
> Key: FLINK-4473
> URL: https://issues.apache.org/jira/browse/FLINK-4473
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Ufuk Celebi
>
> Add page about end-to-end-exactly once processing semantics in relation to 
> connectors and processing semantics in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839336#comment-15839336
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/605
  

[![Coverage 
Status](https://coveralls.io/builds/9850383/badge)](https://coveralls.io/builds/9850383)

Changes Unknown when pulling **342f5c99cc530ccf2a6281223d1f4f917f1fb497 on 
tammymendt:FLINK-1297-v2** into ** on apache:master**.



> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Alexander Alexandrov
>Assignee: Tamara
> Fix For: 0.10.0
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #605: [FLINK-1297] Added OperatorStatsAccumulator for tracking o...

2017-01-25 Thread coveralls
Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/605
  

[![Coverage 
Status](https://coveralls.io/builds/9850383/badge)](https://coveralls.io/builds/9850383)

Changes Unknown when pulling **342f5c99cc530ccf2a6281223d1f4f917f1fb497 on 
tammymendt:FLINK-1297-v2** into ** on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839328#comment-15839328
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Fixed all the minor comments given above. @tonycox , @wuchong , @fhueske .


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Fixed all the minor comments given above. @tonycox , @wuchong , @fhueske .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839322#comment-15839322
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Thank you for picking this up again @sbcd90. I would wait until #3112 is 
merged before rebasing.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

2017-01-25 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Thank you for picking this up again @sbcd90. I would wait until #3112 is 
merged before rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...

2017-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935636
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
--- End diff --

Excessive empty line above this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...

2017-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97757847
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -101,7 +101,7 @@
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
punctuatedWatermarkAssigner;
 
-   private transient ListState> 
offsetsStateForCheckpoint;
+   private transient ListState>> offsetsAndWatermarksStateForCheckpoint;
--- End diff --

I think we should switch to have a specific checkpointed state object 
instead of continuing to "extend" the original Tuple. This will also be helpful 
for compatibility for any future changes to the checkpointed state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839321#comment-15839321
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935971
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 ---
@@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() {
return partitionWatermark;
}
 
+   void setCurrentWatermarkTimestamp(long watermarkTimestamp) {
--- End diff --

The other methods seem to be `public` (although they can actually be 
package-private). Should we stay consistent with that here?


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839317#comment-15839317
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935696
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
Long.MIN_VALUE));
+   }
+
+   return state;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) 
{
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentWatermarkTimestamp()));
+   }
+
+   return state;
+   }
+
+   case PUNCTUATED_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPunctuatedWatermarks partition : 
partitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentPartitionWatermark()));
+   }
+
+   return state;
+   }
+
+   default:
+   // cannot happen, add this as a guard for the 
future
+   throw new RuntimeException();
--- End diff --

Would be good to have a reason message here.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839320#comment-15839320
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97758477
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext 
context) throws Exception {
LOG.debug("snapshotState() called on closed source");
} else {
 
-   offsetsStateForCheckpoint.clear();
+   offsetsAndWatermarksStateForCheckpoint.clear();
 
final AbstractFetcher fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, 
which means we need to return the
-   // originally restored offsets or the assigned 
partitions
+   // originally restored offsets and watermarks 
or the assigned partitions
 
-   if (restoreToOffset != null) {
+   if (restoreToOffsetAndWatermark != null) {
 
-   for (Map.Entry kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
-   offsetsStateForCheckpoint.add(
-   
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
+   for (Map.Entry> kafkaTopicPartitionOffsetAndWatermark : 
restoreToOffsetAndWatermark.entrySet()) {
+   
offsetsAndWatermarksStateForCheckpoint.add(
+   
Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), 
kafkaTopicPartitionOffsetAndWatermark.getValue()));
--- End diff --

Having a specific checkpoint state object will also be helpful for code 
readability in situations like this one (it's quite tricky to understand 
quickly what the key / value refers to, as well as some of the `f0`, `f1` calls 
in other parts of the PR. I know the previous code used `f0` and `f1` also, but 
I think it's a good opportunity to improve that).


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839316#comment-15839316
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935720
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
Long.MIN_VALUE));
+   }
+
+   return state;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) 
{
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentWatermarkTimestamp()));
+   }
+
+   return state;
+   }
+
+   case PUNCTUATED_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPunctuatedWatermarks partition : 
partitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentPartitionWatermark()));
+   }
+
+   return state;
+   }
+
+   default:
+   // cannot happen, add this as a guard for the 
future
+   throw new RuntimeException();
}
-   return state;
}
 
/**
-* Restores the partition offsets.
+* Restores the partition offsets and watermarks.
 * 
-* @param snapshotState The offsets for the partitions 
+* @param snapshotState The offsets and watermarks for the partitions
 */
-   public void restoreOffsets(Map 
snapshotState) {
-   for (KafkaTopicPartitionState partition : allPartitions) {
-   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
-   if (offset != null) {
-   partition.setOffset(offset);
+   public void restoreOffsetsAndWatermarks(Map> snapshotState) {
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition()).f0;
+   if (offset != null) {
+ 

[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839318#comment-15839318
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97757847
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -101,7 +101,7 @@
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
punctuatedWatermarkAssigner;
 
-   private transient ListState> 
offsetsStateForCheckpoint;
+   private transient ListState>> offsetsAndWatermarksStateForCheckpoint;
--- End diff --

I think we should switch to have a specific checkpointed state object 
instead of continuing to "extend" the original Tuple. This will also be helpful 
for compatibility for any future changes to the checkpointed state.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839319#comment-15839319
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935636
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
--- End diff --

Excessive empty line above this line.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...

2017-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935971
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 ---
@@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() {
return partitionWatermark;
}
 
+   void setCurrentWatermarkTimestamp(long watermarkTimestamp) {
--- End diff --

The other methods seem to be `public` (although they can actually be 
package-private). Should we stay consistent with that here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...

2017-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935696
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
Long.MIN_VALUE));
+   }
+
+   return state;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) 
{
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentWatermarkTimestamp()));
+   }
+
+   return state;
+   }
+
+   case PUNCTUATED_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPunctuatedWatermarks partition : 
partitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentPartitionWatermark()));
+   }
+
+   return state;
+   }
+
+   default:
+   // cannot happen, add this as a guard for the 
future
+   throw new RuntimeException();
--- End diff --

Would be good to have a reason message here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...

2017-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97758477
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext 
context) throws Exception {
LOG.debug("snapshotState() called on closed source");
} else {
 
-   offsetsStateForCheckpoint.clear();
+   offsetsAndWatermarksStateForCheckpoint.clear();
 
final AbstractFetcher fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, 
which means we need to return the
-   // originally restored offsets or the assigned 
partitions
+   // originally restored offsets and watermarks 
or the assigned partitions
 
-   if (restoreToOffset != null) {
+   if (restoreToOffsetAndWatermark != null) {
 
-   for (Map.Entry kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
-   offsetsStateForCheckpoint.add(
-   
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
+   for (Map.Entry> kafkaTopicPartitionOffsetAndWatermark : 
restoreToOffsetAndWatermark.entrySet()) {
+   
offsetsAndWatermarksStateForCheckpoint.add(
+   
Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), 
kafkaTopicPartitionOffsetAndWatermark.getValue()));
--- End diff --

Having a specific checkpoint state object will also be helpful for code 
readability in situations like this one (it's quite tricky to understand 
quickly what the key / value refers to, as well as some of the `f0`, `f1` calls 
in other parts of the PR. I know the previous code used `f0` and `f1` also, but 
I think it's a good opportunity to improve that).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3031: [FLINK-4616] Added functionality through which wat...

2017-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935720
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
Long.MIN_VALUE));
+   }
+
+   return state;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) 
{
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentWatermarkTimestamp()));
+   }
+
+   return state;
+   }
+
+   case PUNCTUATED_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPunctuatedWatermarks partition : 
partitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentPartitionWatermark()));
+   }
+
+   return state;
+   }
+
+   default:
+   // cannot happen, add this as a guard for the 
future
+   throw new RuntimeException();
}
-   return state;
}
 
/**
-* Restores the partition offsets.
+* Restores the partition offsets and watermarks.
 * 
-* @param snapshotState The offsets for the partitions 
+* @param snapshotState The offsets and watermarks for the partitions
 */
-   public void restoreOffsets(Map 
snapshotState) {
-   for (KafkaTopicPartitionState partition : allPartitions) {
-   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
-   if (offset != null) {
-   partition.setOffset(offset);
+   public void restoreOffsetsAndWatermarks(Map> snapshotState) {
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition()).f0;
+   if (offset != null) {
+   partition.setOffset(offset);
+   }
+   }
+   break;
+   }
+
+   case PERIODIC_WATERMARKS: {
 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839312#comment-15839312
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97936002
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97936002
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839289#comment-15839289
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934510
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934537
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839290#comment-15839290
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934537
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934510
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839288#comment-15839288
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934502
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
--- End diff --

ya . My bad. Will remove.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839287#comment-15839287
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934495
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
--- End diff --

Am not sure.. For now I think we will implement BatchTableSource only and 
later implement StreamTableSource? Is there any significant design expectation 
for a source to be StreamTableSource?


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934502
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
--- End diff --

ya . My bad. Will remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934495
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
--- End diff --

Am not sure.. For now I think we will implement BatchTableSource only and 
later implement StreamTableSource? Is there any significant design expectation 
for a source to be StreamTableSource?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839285#comment-15839285
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934446
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class, byte[].class
+   };
+   /**
+* Allows specifying the family and qualifier name along with the data 
type of the qualifier for an HBase table
+*
+* @param familythe family name
+* @param qualifier the qualifier name
+* @param clazz the data type of the qualifier
+*/
+   public void addColumn(String family, String qualifier, Class clazz) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
--- End diff --

Good catch.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839283#comment-15839283
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934406
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
--- End diff --

Ok. In our other projects we used to qualify the generic on both the sides. 


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934428
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
--- End diff --

Ok. Makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839284#comment-15839284
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934428
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
--- End diff --

Ok. Makes sense.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934446
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class, byte[].class
+   };
+   /**
+* Allows specifying the family and qualifier name along with the data 
type of the qualifier for an HBase table
+*
+* @param familythe family name
+* @param qualifier the qualifier name
+* @param clazz the data type of the qualifier
+*/
+   public void addColumn(String family, String qualifier, Class clazz) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
--- End diff --

Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934406
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
--- End diff --

Ok. In our other projects we used to qualify the generic on both the sides. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97927186
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839127#comment-15839127
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97927186
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

2017-01-25 Thread sbcd90
Github user sbcd90 commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase 
the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839115#comment-15839115
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user sbcd90 commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase 
the PR.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839107#comment-15839107
 ] 

ASF GitHub Bot commented on FLINK-5546:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
" /tmp/cacheFile (Permission denied)" exception of unit test can be replay.

1. on linux env, `sudo - userA`, clone the flink code, start to `mvn clean 
test verify`
2. on the linux machine, `sudo - userB`, clone the flink codebase, same 
start to `mvn clean test verify`

Because two use share the same  `/tmp` directory which have no random sub 
directory.
IMO, `/tmp` change to `${project.build.directory}`, at same time have 
random suffix. This can resolve the problem thoroughly.



> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-01-25 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
" /tmp/cacheFile (Permission denied)" exception of unit test can be replay.

1. on linux env, `sudo - userA`, clone the flink code, start to `mvn clean 
test verify`
2. on the linux machine, `sudo - userB`, clone the flink codebase, same 
start to `mvn clean test verify`

Because two use share the same  `/tmp` directory which have no random sub 
directory.
IMO, `/tmp` change to `${project.build.directory}`, at same time have 
random suffix. This can resolve the problem thoroughly.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839039#comment-15839039
 ] 

ASF GitHub Bot commented on FLINK-5635:
---

Github user jgrier commented on the issue:

https://github.com/apache/flink/pull/3205
  
Thanks @ex00.  I missed that.  Actually there is a new Flink on Docker stub 
in the top-level docs and I was planning to document these scripts there, 
however we may want to keep this here and just link to it.  Anyway, I will 
update the docs.  Thanks.


> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3205: [FLINK-5635] Improve Docker tooling to make it easier to ...

2017-01-25 Thread jgrier
Github user jgrier commented on the issue:

https://github.com/apache/flink/pull/3205
  
Thanks @ex00.  I missed that.  Actually there is a new Flink on Docker stub 
in the top-level docs and I was planning to document these scripts there, 
however we may want to keep this here and just link to it.  Anyway, I will 
update the docs.  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-25 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839036#comment-15839036
 ] 

Jamie Grier commented on FLINK-5635:


[~greghogan] I actually wasn't aware of [FLINK-4326].  However there does seem 
to be some overlap.  There is also [FLINK-5634] and an associated PR which 
specifically addresses enabling logging to stdout rather than to a file.  
However, neither of these two issues addresses running Flink processes in the 
foreground and avoiding forking as in [FLINK-4326].  I think this remains a 
separate issue that may need to be addressed.

My main motivation in both of these JIRA issues and associated PRs was in 
providing a better Flink on Docker experience and providing some example 
scripts of how to run Flink properly in container-based environments.  I would 
also like to get some "official" Flink Docker images published once we're 
satisfied with them.

> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-01-25 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-5634:
--

Assignee: Jamie Grier

> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-25 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier reassigned FLINK-5635:
--

Assignee: Jamie Grier

> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Jamie Grier
> Fix For: 1.2.0
>
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5640) configure the explicit Unit Test file suffix

2017-01-25 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5640:
-
Affects Version/s: 1.2.1

> configure the explicit Unit Test file suffix
> 
>
> Key: FLINK-5640
> URL: https://issues.apache.org/jira/browse/FLINK-5640
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.2.1
>Reporter: shijinkui
>Assignee: shijinkui
>
> There are four types of Unit Test file: *ITCase.java, *Test.java, 
> *ITSuite.scala, *Suite.scala
> File name ending with "IT.java" is integration test. File name ending with 
> "Test.java"  is unit test.
> It's clear for Surefire plugin of default-test execution to declare that 
> "*Test.*" is Java Unit Test.
> The test file statistics below:
> * Suite  total: 10
> * ITCase  total: 378
> * Test  total: 1008
> * ITSuite  total: 14



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint

2017-01-25 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5649:

Description: 
I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error on Flink 1.2.0 rc2, leading to the job being 
cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 
https://github.com/apache/flink/pull/3152

  was:
I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error, leading to the job being cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 

[jira] [Created] (FLINK-5649) KryoException when starting job from Flink 1.2.0 rc0 savepoint

2017-01-25 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5649:
---

 Summary: KryoException when starting job from Flink 1.2.0 rc0 
savepoint
 Key: FLINK-5649
 URL: https://issues.apache.org/jira/browse/FLINK-5649
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Scott Kidder


I tried to submit a job using a savepoint taken with Flink 1.2.0 rc0 and 
encountered the following error, leading to the job being cancelled:

{noformat}
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 92
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:414)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:798)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
... 6 more
{noformat}

Submitting the same job without a savepoint works fine (except that there's no 
state, of course).

Might be related to FLINK-5484 pull-request 
https://github.com/apache/flink/pull/3152



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-25 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838860#comment-15838860
 ] 

Scott Kidder commented on FLINK-5648:
-

May have been introduced in FLINK-5382 pull-request 
https://github.com/apache/flink/pull/3055

> Task Manager ID missing from logs link in Job Manager UI
> 
>
> Key: FLINK-5648
> URL: https://issues.apache.org/jira/browse/FLINK-5648
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> This appears to be a regression introduced in Flink 1.2.0 rc2. I've been 
> using 1.2.0 rc0 without this issue being present.
> The link from the Job Manager to download logs for a specific Task Manager 
> instance does not include the ID of the Task Manager. The following 
> screenshot shows the link:
> !http://imgur.com/dLhxALT.png!
> The following exception appears in the Job Manager logs after trying to 
> retrieve the logs for a Task Manager (without the ID of the Task Manager 
> given):
> {noformat}
> 2017-01-25 23:34:44,915 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - 
> Fetching TaskManager log failed.
> java.lang.IllegalArgumentException: Argument bytes must by an array of 16 
> bytes
>   at org.apache.flink.util.AbstractID.(AbstractID.java:63)
>   at 
> org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
>   at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
>   at 
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>   at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
>   at 
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
>   at 
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>   at 
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

[jira] [Updated] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-25 Thread Scott Kidder (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Kidder updated FLINK-5648:

Description: 
This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 
1.2.0 rc0 without this issue being present.

The link from the Job Manager to download logs for a specific Task Manager 
instance does not include the ID of the Task Manager. The following screenshot 
shows the link:

!http://imgur.com/dLhxALT.png!

The following exception appears in the Job Manager logs after trying to 
retrieve the logs for a Task Manager (without the ID of the Task Manager given):

{noformat}
2017-01-25 23:34:44,915 ERROR 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - Fetching 
TaskManager log failed.
java.lang.IllegalArgumentException: Argument bytes must by an array of 16 bytes
at org.apache.flink.util.AbstractID.(AbstractID.java:63)
at 
org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
at 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
{noformat}

  was:
This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 
1.2.0 rc0 without this issue being present.

The link from the Job Manager to download logs for a specific Task Manager 
instance does not include the ID of the Task Manager. The following screenshot 
shows the link:

!http://imgur.com/dLhxALT.png!

The following exception appears in the Job Manager logs after trying to 

[jira] [Created] (FLINK-5648) Task Manager ID missing from logs link in Job Manager UI

2017-01-25 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5648:
---

 Summary: Task Manager ID missing from logs link in Job Manager UI
 Key: FLINK-5648
 URL: https://issues.apache.org/jira/browse/FLINK-5648
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.0
Reporter: Scott Kidder


This appears to be a regression introduced in Flink 1.2.0 rc2. I've been using 
1.2.0 rc0 without this issue being present.

The link from the Job Manager to download logs for a specific Task Manager 
instance does not include the ID of the Task Manager. The following screenshot 
shows the link:

!http://imgur.com/dLhxALT.png!

The following exception appears in the Job Manager logs after trying to 
retrieve the logs for a Task Manager (without the ID of the Task Manager given):

{{
2017-01-25 23:34:44,915 ERROR 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - Fetching 
TaskManager log failed.
java.lang.IllegalArgumentException: Argument bytes must by an array of 16 bytes
at org.apache.flink.util.AbstractID.(AbstractID.java:63)
at 
org.apache.flink.runtime.instance.InstanceID.(InstanceID.java:33)
at 
org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.respondAsLeader(TaskManagerLogHandler.java:170)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:90)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3189: [FLINK-5608] [webfrontend] Cancel button stays visible in...

2017-01-25 Thread rehevkor5
Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/3189
  
Thanks @uce that's helpful! Didn't think of putting screenshots in the PR.

@zentol The global job state counts didn't seem like the most important 
thing to show when the browser is narrow, given that this view is about a 
particular job. So, I added these CSS classes to it ".hidden-xs .hidden-sm 
.hidden-md" the behavior of which is described on 
http://getbootstrap.com/css/#responsive-utilities I am not sure what width @uce 
used for the "wide" screenshots, but it probably was less than 1200px, 
otherwise it would show up. I can remove the ".hidden-md" class if you want it 
to show up in the 992px to 1200px range, too... however it does make it more 
likely for the other more job-specific elements (start/end, duration) to wrap & 
get lost. Perhaps it would be a good idea to reorder the elements? Put 
start/end & duration first, and overall job stats last so that it's the first 
one to wrap? Let me know if you have a preference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5608) Cancel button not always visible

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838835#comment-15838835
 ] 

ASF GitHub Bot commented on FLINK-5608:
---

Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/3189
  
Thanks @uce that's helpful! Didn't think of putting screenshots in the PR.

@zentol The global job state counts didn't seem like the most important 
thing to show when the browser is narrow, given that this view is about a 
particular job. So, I added these CSS classes to it ".hidden-xs .hidden-sm 
.hidden-md" the behavior of which is described on 
http://getbootstrap.com/css/#responsive-utilities I am not sure what width @uce 
used for the "wide" screenshots, but it probably was less than 1200px, 
otherwise it would show up. I can remove the ".hidden-md" class if you want it 
to show up in the 992px to 1200px range, too... however it does make it more 
likely for the other more job-specific elements (start/end, duration) to wrap & 
get lost. Perhaps it would be a good idea to reorder the elements? Put 
start/end & duration first, and overall job stats last so that it's the first 
one to wrap? Let me know if you have a preference.


> Cancel button not always visible
> 
>
> Key: FLINK-5608
> URL: https://issues.apache.org/jira/browse/FLINK-5608
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.1.4
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>Priority: Minor
>
> When the window is not wide enough, or when the job name is too long, the 
> "Cancel" button in the Job view of the web UI is not visible because it is 
> the first element that gets wrapped down and gets covered by the secondary 
> navbar (the tabs). This causes us to often need to resize the browser wider 
> than our monitor in order to use the cancel button.
> In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the 
> content may wrap, especially if the content's horizontal width if not known & 
> fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any 
> unexpected change in height will result in overlap with the rest of the 
> normal-flow content in the page. The Bootstrap docs explain this in their 
> "Overflowing content" callout.
> I am submitting a PR which does not attempt to resolve all issues with the 
> fixed navbar approach, but attempts to improve the situation by using less 
> horizontal space and by altering the layout approach of the Cancel button.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97898531
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
--- End diff --

could you just put `<>` here instead of >>?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97903962
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838814#comment-15838814
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97904966
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97897602
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class, byte[].class
+   };
+   /**
+* Allows specifying the family and qualifier name along with the data 
type of the qualifier for an HBase table
+*
+* @param familythe family name
+* @param qualifier the qualifier name
+* @param clazz the data type of the qualifier
+*/
+   public void addColumn(String family, String qualifier, Class clazz) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
--- End diff --

must be qualifier


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97903708
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
--- End diff --

why public?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838813#comment-15838813
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97900030
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
--- End diff --

I think, if it's used only in table api would be good java.sql.Date
cause calcite use sql type of date


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838818#comment-15838818
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97901774
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
--- End diff --

will be `StreamTableSource` okay for HBase source?
I think it can be implemented same as batch `getDataSet`


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838817#comment-15838817
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97898531
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
--- End diff --

could you just put `<>` here instead of >>?


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838816#comment-15838816
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97903708
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
--- End diff --

why public?


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838815#comment-15838815
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97903962
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838819#comment-15838819
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97897602
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class, byte[].class
+   };
+   /**
+* Allows specifying the family and qualifier name along with the data 
type of the qualifier for an HBase table
+*
+* @param familythe family name
+* @param qualifier the qualifier name
+* @param clazz the data type of the qualifier
+*/
+   public void addColumn(String family, String qualifier, Class clazz) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
--- End diff --

must be qualifier


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97901774
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
--- End diff --

will be `StreamTableSource` okay for HBase source?
I think it can be implemented same as batch `getDataSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97904966
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction mapFunction = new MapFunction() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992l));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97900030
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map>> 
familyMap =
+   new HashMap>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
--- End diff --

I think, if it's used only in table api would be good java.sql.Date
cause calcite use sql type of date


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5612) GlobPathFilter not-serializable exception

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838707#comment-15838707
 ] 

ASF GitHub Bot commented on FLINK-5612:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3206
  
Hi @StephanEwen, @zentol 

Thank you for your comments. I've update the PR accordingly.


> GlobPathFilter not-serializable exception
> -
>
> Key: FLINK-5612
> URL: https://issues.apache.org/jira/browse/FLINK-5612
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Ivan Mushketyk
>Priority: Blocker
>
> A user reported on the mailing list a non-serializable exception when using 
> the GlobFIlePathFilters.
> It appears that the PathMatchers are all created as anonymous inner classes 
> and thus contain a reference to the encapsulating, non-serializable 
> FileSystem class.
> We can fix this by moving the Matcher instantiation into filterPath(...).
> {code}
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> final TextInputFormat format = new TextInputFormat(new Path("/temp"));
> format.setFilesFilter(new GlobFilePathFilter(
> Collections.singletonList("**"),
> Arrays.asList("**/another_file.bin", "**/dataFile1.txt")
> ));
> DataSet result = env.readFile(format,"/tmp");
> result.writeAsText("/temp/out");
> env.execute("GlobFilePathFilter-Test");
> }
> {code}
> {code}
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Error translating node 'Data Source "at
> readFile(ExecutionEnvironment.java:520)
> (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106)
> at
> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192)
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:188)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
> at com.apsaltis.EventDetectionJob.main(EventDetectionJob.java:75)
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
> at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281)
> ... 8 more
> Caused by: java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.ArrayList.writeObject(ArrayList.java:747)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> 

[GitHub] flink issue #3206: [FLINK-5612] Fix GlobPathFilter not-serializable exceptio...

2017-01-25 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3206
  
Hi @StephanEwen, @zentol 

Thank you for your comments. I've update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5647) Fix RocksDB Backend Cleanup

2017-01-25 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5647:
---

 Summary: Fix RocksDB Backend Cleanup
 Key: FLINK-5647
 URL: https://issues.apache.org/jira/browse/FLINK-5647
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.1.4
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


The RocksDB backend on Flink 1.1.x does not properly clean up the directories 
that it uses. This can lead to overflowing disks when a lot of failure/recovery 
cycles happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838472#comment-15838472
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/3207


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838471#comment-15838471
 ] 

ASF GitHub Bot commented on FLINK-5630:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3207
  
Manually merged to `master` in 1542260d52238e87de4fa040e6079465777e8263


> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3207: [FLINK-5630] [streaming api] Followups to the AggregateFu...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3207
  
Manually merged to `master` in 1542260d52238e87de4fa040e6079465777e8263


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3207: [FLINK-5630] [streaming api] Followups to the Aggr...

2017-01-25 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/3207


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5632) Typo in StreamGraph

2017-01-25 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5632.
---

> Typo in StreamGraph
> ---
>
> Key: FLINK-5632
> URL: https://issues.apache.org/jira/browse/FLINK-5632
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5630.
---

> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5630) Followups to AggregationFunction

2017-01-25 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5630.
-
Resolution: Fixed

Fixed in 1542260d52238e87de4fa040e6079465777e8263

> Followups to AggregationFunction
> 
>
> Key: FLINK-5630
> URL: https://issues.apache.org/jira/browse/FLINK-5630
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Various followup issues to the aggregation function, like
>   - Allowing different input/output types for the cases where an additional 
> window apply function is specified
>   - Adding the {{aggregate()}} methods to the Scala API
>   - Adding the window translation tests



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5632) Typo in StreamGraph

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838468#comment-15838468
 ] 

ASF GitHub Bot commented on FLINK-5632:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3203


> Typo in StreamGraph
> ---
>
> Key: FLINK-5632
> URL: https://issues.apache.org/jira/browse/FLINK-5632
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5632) Typo in StreamGraph

2017-01-25 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5632.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed via 6f5c7d8a68e3a1fa88902d8d314f350563b76752

> Typo in StreamGraph
> ---
>
> Key: FLINK-5632
> URL: https://issues.apache.org/jira/browse/FLINK-5632
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3203: [FLINK-5632] [streaming] Typo in StreamGraph

2017-01-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3203


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4912) Introduce RECONCILING state in ExecutionGraph

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838452#comment-15838452
 ] 

ASF GitHub Bot commented on FLINK-4912:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3113
  
Considering the possible state transitions:

## ExecutionState
  - `RECONCILING` can only be entered from `CREATED`

Simple:
  - `RECONCILING` can go to `RUNNING` if the task was reconciled
  - `RECONCILING` can go to `FAILED` if the task was not reconciled

Complex:
  - For `RECONCILING` to go to `FINISHED`, `CANCELED`, it would mean that 
the TaskManager that has the task would report (when registering at the 
JobManager) a task that is no longer executing. To do that, the TaskManager 
would need to "remember" tasks that completed and where it did not get an 
acknowledgement from the JobManager for the execution state update. Is that 
anticipated?

## JobStatus
  - `RECONCILING` can only be entered from `CREATED`

Simple:
  - `RECONCILING` can go to `RUNNING` - if all TaskManagers report their 
status and tasks as running
  - `RECONCILING` can go to `FAILING` - if not all tasks were reported.

Complex:
  - For reconciling to go to into `FINISHED`, we'd need that the 
`ExecutionState` can go to `FINISHED`.

What do you think about only doing the "simple" option in the first version?


> Introduce RECONCILING state in ExecutionGraph
> -
>
> Key: FLINK-4912
> URL: https://issues.apache.org/jira/browse/FLINK-4912
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> This is part of the non-disruptive JobManager failure recovery.
> I suggest to add a JobStatus and ExecutionState {{RECONCILING}}.
> If a job is started on a that JobManager for master recovery (tbd how to 
> determine that) the {{ExecutionGraph}} and the {{Execution}}s start in the 
> reconciling state.
> From {{RECONCILING}}, tasks can go to {{RUNNING}} (execution reconciled with 
> TaskManager) or to {{FAILED}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3113: [FLINK-4912] Introduce RECONCILIATING state in ExecutionG...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3113
  
Considering the possible state transitions:

## ExecutionState
  - `RECONCILING` can only be entered from `CREATED`

Simple:
  - `RECONCILING` can go to `RUNNING` if the task was reconciled
  - `RECONCILING` can go to `FAILED` if the task was not reconciled

Complex:
  - For `RECONCILING` to go to `FINISHED`, `CANCELED`, it would mean that 
the TaskManager that has the task would report (when registering at the 
JobManager) a task that is no longer executing. To do that, the TaskManager 
would need to "remember" tasks that completed and where it did not get an 
acknowledgement from the JobManager for the execution state update. Is that 
anticipated?

## JobStatus
  - `RECONCILING` can only be entered from `CREATED`

Simple:
  - `RECONCILING` can go to `RUNNING` - if all TaskManagers report their 
status and tasks as running
  - `RECONCILING` can go to `FAILING` - if not all tasks were reported.

Complex:
  - For reconciling to go to into `FINISHED`, we'd need that the 
`ExecutionState` can go to `FINISHED`.

What do you think about only doing the "simple" option in the first version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5092) Add integration with Sonarqube and code coverage

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838448#comment-15838448
 ] 

ASF GitHub Bot commented on FLINK-5092:
---

Github user BorisOsipov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2836#discussion_r97863068
  
--- Diff: flink-contrib/flink-storm-examples/pom.xml ---
@@ -364,6 +363,18 @@ under the License.



+   
+   shade-flink
--- End diff --

The maven's philosophy is 'one artifact per project'. These projects don't 
follow this way and produces several jar's artifacts :)
Shade plugin should produce 
flink-storm-streaming_2.10-1.2-SNAPSHOT-shaded.jar but it doesn't do it. It is 
a problem for maven install plugin that wants to have such jar by default. And 
it leads to failure on install phase without such configuration changes. For 
this reason I added such changes.

So I propose don't collect test coverage for these projects (flink-storm 
and streaming examples)  at all. What do you think?


> Add integration with Sonarqube and code coverage
> 
>
> Key: FLINK-5092
> URL: https://issues.apache.org/jira/browse/FLINK-5092
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Boris Osipov
>Assignee: Boris Osipov
>
> It would be good to have the opportunity to generate test coverage reports 
> for Flink and analyze code by SonarQube.
> Parts of tasks:
> - add generate test coverage reports for Flink with new maven profile
> - implement integration with https://analysis.apache.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...

2017-01-25 Thread BorisOsipov
Github user BorisOsipov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2836#discussion_r97863068
  
--- Diff: flink-contrib/flink-storm-examples/pom.xml ---
@@ -364,6 +363,18 @@ under the License.



+   
+   shade-flink
--- End diff --

The maven's philosophy is 'one artifact per project'. These projects don't 
follow this way and produces several jar's artifacts :)
Shade plugin should produce 
flink-storm-streaming_2.10-1.2-SNAPSHOT-shaded.jar but it doesn't do it. It is 
a problem for maven install plugin that wants to have such jar by default. And 
it leads to failure on install phase without such configuration changes. For 
this reason I added such changes.

So I propose don't collect test coverage for these projects (flink-storm 
and streaming examples)  at all. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5646) REST api documentation missing details on jar upload

2017-01-25 Thread Cliff Resnick (JIRA)
Cliff Resnick created FLINK-5646:


 Summary: REST api documentation missing details on jar upload
 Key: FLINK-5646
 URL: https://issues.apache.org/jira/browse/FLINK-5646
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Cliff Resnick
Priority: Minor


The 1.2 release documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/rest_api.html)
  states "It is possible to upload, run, and list Flink programs via the REST 
APIs and web frontend". However there is no documentation about uploading a jar 
via REST api. 
There should be something to the effect of:
"You can upload a jar file using http post with the file data sent under a form 
field 'jarfile'."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838406#comment-15838406
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2361
  
Hi @mushketyk ! Yes. In this case we would expect to have everything apart 
from the "end" event in the result, right?


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

2017-01-25 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2361
  
Hi @mushketyk ! Yes. In this case we would expect to have everything apart 
from the "end" event in the result, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3113: [FLINK-4912] Introduce RECONCILIATING state in ExecutionG...

2017-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3113
  
Will merge this.
To make it proper robust, I will add some tests that validate the state 
transitions of the state machine...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4912) Introduce RECONCILING state in ExecutionGraph

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838403#comment-15838403
 ] 

ASF GitHub Bot commented on FLINK-4912:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3113
  
Will merge this.
To make it proper robust, I will add some tests that validate the state 
transitions of the state machine...


> Introduce RECONCILING state in ExecutionGraph
> -
>
> Key: FLINK-4912
> URL: https://issues.apache.org/jira/browse/FLINK-4912
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> This is part of the non-disruptive JobManager failure recovery.
> I suggest to add a JobStatus and ExecutionState {{RECONCILING}}.
> If a job is started on a that JobManager for master recovery (tbd how to 
> determine that) the {{ExecutionGraph}} and the {{Execution}}s start in the 
> reconciling state.
> From {{RECONCILING}}, tasks can go to {{RUNNING}} (execution reconciled with 
> TaskManager) or to {{FAILED}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #:

2017-01-25 Thread mindprince
Github user mindprince commented on the pull request:


https://github.com/apache/flink/commit/6342d6db1de5f38a921732e35abd83e6c5b9305a#commitcomment-20615018
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 on line 130:
I thought `allowedLateness` affected when we purged windows. Wouldn't this 
result in keeping processing time windows around for longer than we should?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5637) Default Flink configuration contains whitespace characters, causing parser WARNings

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838325#comment-15838325
 ] 

ASF GitHub Bot commented on FLINK-5637:
---

Github user rmetzger closed the pull request at:

https://github.com/apache/flink/pull/3216


> Default Flink configuration contains whitespace characters, causing parser 
> WARNings
> ---
>
> Key: FLINK-5637
> URL: https://issues.apache.org/jira/browse/FLINK-5637
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>  Labels: starter
> Fix For: 1.2.0, 1.3.0
>
>
> {code}
> 2017-01-25 09:45:30,670 WARN  
> org.apache.flink.configuration.GlobalConfiguration- Error while 
> trying to split key and value in configuration file 
> /yarn/nm/usercache/robert/appcache/application_1485249546281_0018/container_1485249546281_0018_01_01/flink-conf.yaml:
>   
> {code}
> The whitespace is currently in line 67:
> {code}
> #==
>  
> # The address under which the web-based runtime monitor listens.
> {code}
> I think we should add a test to the {{GlobalConfigurationTest}} that ensures 
> the configuration file we are shipping doesn't produce any WARNings by 
> default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3216: [FLINK-5637] Avoid warning while parsing YAML conf...

2017-01-25 Thread rmetzger
Github user rmetzger closed the pull request at:

https://github.com/apache/flink/pull/3216


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >