[jira] [Commented] (FLINK-21966) Support Kinesis connector in Python DataStream API.

2022-05-29 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543794#comment-17543794
 ] 

Dian Fu commented on FLINK-21966:
-

[~pemide] Thanks for the offering. I have assigned it to you~

> Support Kinesis connector in Python DataStream API.
> ---
>
> Key: FLINK-21966
> URL: https://issues.apache.org/jira/browse/FLINK-21966
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Assignee: pengmd
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-21966) Support Kinesis connector in Python DataStream API.

2022-05-29 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-21966:
---

Assignee: pengmd

> Support Kinesis connector in Python DataStream API.
> ---
>
> Key: FLINK-21966
> URL: https://issues.apache.org/jira/browse/FLINK-21966
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Assignee: pengmd
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] chucheng92 commented on pull request #19827: [FLINK-27806][table] Support binary & varbinary types in datagen connector

2022-05-29 Thread GitBox


chucheng92 commented on PR #19827:
URL: https://github.com/apache/flink/pull/19827#issuecomment-1140769198

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] chucheng92 commented on pull request #19827: [FLINK-27806][table] Support binary & varbinary types in datagen connector

2022-05-29 Thread GitBox


chucheng92 commented on PR #19827:
URL: https://github.com/apache/flink/pull/19827#issuecomment-1140768985

   @wuchong can u help me to review this pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] chucheng92 commented on pull request #19827: [FLINK-27806][table] Support binary & varbinary types in datagen connector

2022-05-29 Thread GitBox


chucheng92 commented on PR #19827:
URL: https://github.com/apache/flink/pull/19827#issuecomment-1140768703

   @flinkbot attention @wuchong @lirui-apache 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-21966) Support Kinesis connector in Python DataStream API.

2022-05-29 Thread pengmd (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543792#comment-17543792
 ] 

pengmd commented on FLINK-21966:


[~dianfu] I am very interested in this issue. Could you please assign this 
issue to me?

> Support Kinesis connector in Python DataStream API.
> ---
>
> Key: FLINK-21966
> URL: https://issues.apache.org/jira/browse/FLINK-21966
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884472377


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
 @Override
 public Map onCompactTable(
 Context context, CatalogPartitionSpec catalogPartitionSpec) {
-throw new UnsupportedOperationException("Not implement yet");
+Map newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+FileStore fileStore = buildTableStore(context).buildFileStore();
+FileStoreScan.Plan plan =
+fileStore
+.newScan()
+.withPartitionFilter(
+PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+fileStore.partitionType()))
+.plan();
+
+Preconditions.checkState(
+plan.snapshotId() != null && !plan.files().isEmpty(),
+"The specified %s to compact does not exist any snapshot",
+catalogPartitionSpec.getPartitionSpec().isEmpty()
+? "table"
+: String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+Map>> groupBy = 
plan.groupByPartFiles();
+if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key( {
+groupBy =
+pickManifest(
+groupBy,
+new 
FileStoreOptions(Configuration.fromMap(newOptions))
+.mergeTreeOptions(),
+new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+}
+try {
+newOptions.put(
+COMPACTION_SCANNED_MANIFEST.key(),
+Base64.getEncoder()
+.encodeToString(
+InstantiationUtil.serializeObject(
+new PartitionedManifestMeta(
+plan.snapshotId(), 
groupBy;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+return newOptions;
+}
+
+@VisibleForTesting
+Map>> pickManifest(

Review Comment:
   > You have picked files here, but how to make sure that writer will compact 
these files?
   
   As offline discussed, the main purpose for `ALTER TABLE COMPACT` is to 
squeeze those files which have key range overlapped or too small. It is not 
exactly what universal compaction does. As a result, when after picking these 
files at the planning phase, the runtime should not pick them again, because 
they are already picked. So `FileStoreWriteImpl` should create different 
compact strategies for ① the auto-compaction triggered by ordinary writes v.s. 
② the manual triggered compaction. For the latter, the strategy should directly 
return all the files it receives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884472377


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
 @Override
 public Map onCompactTable(
 Context context, CatalogPartitionSpec catalogPartitionSpec) {
-throw new UnsupportedOperationException("Not implement yet");
+Map newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+FileStore fileStore = buildTableStore(context).buildFileStore();
+FileStoreScan.Plan plan =
+fileStore
+.newScan()
+.withPartitionFilter(
+PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+fileStore.partitionType()))
+.plan();
+
+Preconditions.checkState(
+plan.snapshotId() != null && !plan.files().isEmpty(),
+"The specified %s to compact does not exist any snapshot",
+catalogPartitionSpec.getPartitionSpec().isEmpty()
+? "table"
+: String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+Map>> groupBy = 
plan.groupByPartFiles();
+if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key( {
+groupBy =
+pickManifest(
+groupBy,
+new 
FileStoreOptions(Configuration.fromMap(newOptions))
+.mergeTreeOptions(),
+new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+}
+try {
+newOptions.put(
+COMPACTION_SCANNED_MANIFEST.key(),
+Base64.getEncoder()
+.encodeToString(
+InstantiationUtil.serializeObject(
+new PartitionedManifestMeta(
+plan.snapshotId(), 
groupBy;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+return newOptions;
+}
+
+@VisibleForTesting
+Map>> pickManifest(

Review Comment:
   > You have picked files here, but how to make sure that writer will compact 
these files?
   
   As offline discussed, the main purpose for `ALTER TABLE COMPACT` is to 
squeeze those files which have key range overlapped or too small. It is not 
exactly what universal compaction does. As a result, when after picking these 
files at the planning phase, the runtime should not pick them again, because 
they are already picked. So `FileStoreWriteImpl` should create different 
compact strategies for the auto-compaction for normal write v.s. the manual 
triggered compaction. For the latter, the strategy should directly return all 
the files it receives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884456680


##
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
 }
 }
 
+@Test
+public void testOnCompactTableForNoSnapshot() {
+RowType partType = RowType.of();
+MockTableStoreManagedFactory mockTableStoreManagedFactory =
+new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE);
+prepare(
+TABLE + "_" + UUID.randomUUID(),
+partType,
+NON_PARTITIONED_ROW_TYPE,
+NON_PARTITIONED,
+true);
+assertThatThrownBy(
+() ->
+mockTableStoreManagedFactory.onCompactTable(
+context, new 
CatalogPartitionSpec(emptyMap(
+.isInstanceOf(IllegalStateException.class)
+.hasMessageContaining("The specified table to compact does not 
exist any snapshot");
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) 
throws Exception {
+RowType partType = RowType.of();
+runTest(
+new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE),
+TABLE + "_" + UUID.randomUUID(),
+partType,
+NON_PARTITIONED_ROW_TYPE,
+NON_PARTITIONED,
+rescaleBucket);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) 
throws Exception {
+runTest(
+new MockTableStoreManagedFactory(
+SINGLE_PARTITIONED_PART_TYPE, 
SINGLE_PARTITIONED_ROW_TYPE),
+TABLE + "_" + UUID.randomUUID(),
+SINGLE_PARTITIONED_PART_TYPE,
+SINGLE_PARTITIONED_ROW_TYPE,
+SINGLE_PARTITIONED,
+rescaleBucket);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) 
throws Exception {
+runTest(
+new MockTableStoreManagedFactory(),
+TABLE + "_" + UUID.randomUUID(),
+DEFAULT_PART_TYPE,
+DEFAULT_ROW_TYPE,
+MULTI_PARTITIONED,
+rescaleBucket);
+}
+
+@MethodSource("provideManifest")
+@ParameterizedTest

Review Comment:
   > Can we have a name for each parameter? Very difficult to maintain without 
a name.
   
   I cannot understand well what you mean. Do you suggest not use Junit5 
parameterized test? Method source doc illustrates the usage.
   
![image](https://user-images.githubusercontent.com/55568005/170916432-3a9e59ca-3734-4ddb-aaa4-bf825e06f731.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #244: [FLINK-27520] Use admission-controller-framework in Webhook

2022-05-29 Thread GitBox


morhidi commented on code in PR #244:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/244#discussion_r884442238


##
flink-kubernetes-webhook/pom.xml:
##
@@ -36,6 +36,19 @@ under the License.
 org.apache.flink
 flink-kubernetes-operator
 ${project.version}
+provided
+
+
+
+io.javaoperatorsdk
+operator-framework-framework-core
+${operator.sdk.admission-controller.version}
+
+
+*

Review Comment:
   Had to keep the excludes in the dependency for the unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #237: [FLINK-27257] Flink kubernetes operator triggers savepoint failed because of not all tasks running

2022-05-29 Thread GitBox


wangyang0918 commented on PR #237:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/237#issuecomment-1140726331

   For Flink batch jobs, we do not require all the tasks running to indicate 
that the Flink job is running. So it does not make sense when we want to use 
`getJobDetails` to verify whether the job is running. But I agree it could be 
used to check whether all the tasks are running before triggering a savepoint.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #244: [FLINK-27520] Use admission-controller-framework in Webhook

2022-05-29 Thread GitBox


morhidi commented on code in PR #244:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/244#discussion_r884434216


##
flink-kubernetes-webhook/pom.xml:
##
@@ -36,6 +36,19 @@ under the License.
 org.apache.flink
 flink-kubernetes-operator
 ${project.version}
+provided
+
+
+
+io.javaoperatorsdk
+operator-framework-framework-core
+${operator.sdk.admission-controller.version}
+
+
+*

Review Comment:
   Thanks @gyfora for the suggestion, I like it better myself. Changed and 
pushed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zou-can commented on pull request #19828: [FLINK-27762][connector/kafka] KafkaPartitionSplitReader#wakeup should only unblock KafkaConsumer#poll invocation

2022-05-29 Thread GitBox


zou-can commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1140717266

   As i commented on https://issues.apache.org/jira/browse/FLINK-27762
   
   The exception we met is in method 
**KafkaPartitionSplitReader#removeEmptySplits**. But i can't find any action 
for handling this exception in that method.
   ```
   // KafkaPartitionSplitReader#removeEmptySplits
   
   private void removeEmptySplits() {
   List emptyPartitions = new ArrayList<>();
  
   for (TopicPartition tp : consumer.assignment()) {
   // WakeUpException is thrown here.
   // since KafkaConsumer#wakeUp is called before, if execute 
KafkaConsumer#postion in 'if statement' above, it will throw WakeUpException.   
   if (consumer.position(tp) >= getStoppingOffset(tp)) {
   emptyPartitions.add(tp);
   }
   }
   
  // ignore irrelevant code...     
   } 
   ```
   What I'm focus is **how to handle WakeUpException** after it was thrown.
   
   As what is done in **KafkaPartitionSplitReader#fetch**
   ```
   // KafkaPartitionSplitReader#fetch
   
   public RecordsWithSplitIds> fetch() throws 
IOException {
   ConsumerRecords consumerRecords;
   try {
   consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
   } catch (WakeupException we) {
   // catch exception and return empty result.
   return new KafkaPartitionSplitRecords(
   ConsumerRecords.empty(), kafkaSourceReaderMetrics);
   }
   
  // ignore irrelevant code...     
   }
   ```
   Maybe we should catch this exception in 
**KafkaPartitionSplitReader#removeEmptySplits**, and retry 
**KafkaConsumer#postion** again.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27009) Support SQL job submission in flink kubernetes opeartor

2022-05-29 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-27009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543770#comment-17543770
 ] 

Márton Balassi commented on FLINK-27009:


[~wangyang0918] I do not have well-formed thoughts yet, I was only thinking 
about SQLScript to this point, but I do see how that come become tedious and 
warrant a SQLScriptURI instead or as an option. For the sake of concise 
examples it would be nice to be able to drop the sql into a field, but I am not 
hell-bent on it.

> Support SQL job submission in flink kubernetes opeartor
> ---
>
> Key: FLINK-27009
> URL: https://issues.apache.org/jira/browse/FLINK-27009
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>
> Currently, the flink kubernetes opeartor is for jar job using application or 
> session cluster. For SQL job, there is no out of box solution in the 
> operator.  
> One simple and short-term solution is to wrap the SQL script into a jar job 
> using table API with limitation.
> The long-term solution may work with 
> [FLINK-26541|https://issues.apache.org/jira/browse/FLINK-26541] to achieve 
> the full support.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27831) Provide example of Beam on the k8s operator

2022-05-29 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-27831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543769#comment-17543769
 ] 

Márton Balassi commented on FLINK-27831:


On second thought the example has heavy dependencies on the Beam side, might 
make more sense for it to live in Beam instead with only some of the docs in 
the flink-kubernetes-operator side. [~mxm] what do you think?

> Provide example of Beam on the k8s operator
> ---
>
> Key: FLINK-27831
> URL: https://issues.apache.org/jira/browse/FLINK-27831
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Minor
> Fix For: kubernetes-operator-1.1.0
>
>
> Multiple users have asked for whether the operator supports Beam jobs in 
> different shapes. I assume that running a Beam job ultimately with the 
> current operator ultimately comes down to having the right jars on the 
> classpath / packaged into the user's fatjar.
> At this stage I suggest adding one such example, providing it might attract 
> new users.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543768#comment-17543768
 ] 

Zili Chen edited comment on FLINK-27834 at 5/30/22 3:42 AM:


Thanks for your notification [~wangyang0918]. I'll take a look today.


was (Author: tison):
Thanks for your notification [~wangyang0918]. I'll try a look today.

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543768#comment-17543768
 ] 

Zili Chen commented on FLINK-27834:
---

Thanks for your notification [~wangyang0918]. I'll try a look today.

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884394501


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
 @Override
 public Map onCompactTable(
 Context context, CatalogPartitionSpec catalogPartitionSpec) {
-throw new UnsupportedOperationException("Not implement yet");
+Map newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+FileStore fileStore = buildTableStore(context).buildFileStore();
+FileStoreScan.Plan plan =
+fileStore
+.newScan()
+.withPartitionFilter(
+PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+fileStore.partitionType()))
+.plan();
+
+Preconditions.checkState(
+plan.snapshotId() != null && !plan.files().isEmpty(),
+"The specified %s to compact does not exist any snapshot",
+catalogPartitionSpec.getPartitionSpec().isEmpty()
+? "table"
+: String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+Map>> groupBy = 
plan.groupByPartFiles();
+if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key( {
+groupBy =
+pickManifest(
+groupBy,
+new 
FileStoreOptions(Configuration.fromMap(newOptions))
+.mergeTreeOptions(),
+new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+}
+try {
+newOptions.put(
+COMPACTION_SCANNED_MANIFEST.key(),
+Base64.getEncoder()
+.encodeToString(
+InstantiationUtil.serializeObject(
+new PartitionedManifestMeta(
+plan.snapshotId(), 
groupBy;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+return newOptions;
+}
+
+@VisibleForTesting
+Map>> pickManifest(
+Map>> groupBy,
+MergeTreeOptions options,
+Comparator keyComparator) {
+Map>> filtered = new 
HashMap<>();
+
+for (Map.Entry>> 
partEntry :
+groupBy.entrySet()) {
+Map> manifests = new HashMap<>();
+for (Map.Entry> bucketEntry :
+partEntry.getValue().entrySet()) {
+List smallFiles =

Review Comment:
   > For example:
   inputs: File1(0-10) File2(10-90) File3(90-100)
   merged: File4(0-100) File2(10-90)
   This can lead to results with overlapping.
   
   I agree with you that "we cannot pick small files at random". But the 
example you provide cannot prove this. These three files are all overlapped.  
The small file threshold will pick File1(0-10) and File3(90-100), and the 
interval partition will pick all of them. So after deduplication, they all get 
compacted.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27831) Provide example of Beam on the k8s operator

2022-05-29 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-27831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543767#comment-17543767
 ] 

Márton Balassi commented on FLINK-27831:


I managed to get the Beam 
[WordCount|https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java]
 example based on the 
[quickstart|https://beam.apache.org/get-started/quickstart-java/#get-the-example-code]
 running with Flink 1.14 on the operator.

It needs a bit of tweaking to make it pretty, will create a PR soonish.

> Provide example of Beam on the k8s operator
> ---
>
> Key: FLINK-27831
> URL: https://issues.apache.org/jira/browse/FLINK-27831
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Minor
> Fix For: kubernetes-operator-1.1.0
>
>
> Multiple users have asked for whether the operator supports Beam jobs in 
> different shapes. I assume that running a Beam job ultimately with the 
> current operator ultimately comes down to having the right jars on the 
> classpath / packaged into the user's fatjar.
> At this stage I suggest adding one such example, providing it might attract 
> new users.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27762) Kafka WakeUpException during handling splits changes

2022-05-29 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543766#comment-17543766
 ] 

Qingsheng Ren commented on FLINK-27762:
---

Thanks a lot for the review [~zoucan] ! What about moving the discussion about 
code and PRs to Github?

> Kafka WakeUpException during handling splits changes
> 
>
> Key: FLINK-27762
> URL: https://issues.apache.org/jira/browse/FLINK-27762
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.3
>Reporter: zoucan
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when 
> kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   ... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>   ... 6 more {code}
>  
> After preliminary investigation, according to source code of KafkaSource,
> At first: 
> method *org.apache.kafka.clients.consumer.KafkaConsumer.wakeup()* will be 
> called if consumer is polling data.
> Later: 
> method *org.apache.kafka.clients.consumer.KafkaConsumer.position()* will be 
> called during handle splits changes.
> Since consumer has been waken up, it will throw WakeUpException.



--
This message was sent

[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884390344


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java:
##
@@ -61,20 +61,24 @@ public FileStorePathFactory(Path root, RowType 
partitionType, String defaultPart
 this.root = root;
 this.uuid = UUID.randomUUID().toString();
 
-String[] partitionColumns = partitionType.getFieldNames().toArray(new 
String[0]);
-this.partitionComputer =
-new RowDataPartitionComputer(
-defaultPartValue,
-partitionColumns,
-partitionType.getFields().stream()
-.map(f -> 
LogicalTypeDataTypeConverter.toDataType(f.getType()))
-.toArray(DataType[]::new),
-partitionColumns);
+this.partitionComputer = getPartitionComputer(partitionType, 
defaultPartValue);
 
 this.manifestFileCount = new AtomicInteger(0);
 this.manifestListCount = new AtomicInteger(0);
 }
 
+public static RowDataPartitionComputer getPartitionComputer(

Review Comment:
   > Just for test?
   
   Maybe for logging too. Currently, the log does not reveal the readable 
partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884388037


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
 @Override
 public Map onCompactTable(
 Context context, CatalogPartitionSpec catalogPartitionSpec) {
-throw new UnsupportedOperationException("Not implement yet");
+Map newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+FileStore fileStore = buildTableStore(context).buildFileStore();
+FileStoreScan.Plan plan =
+fileStore
+.newScan()
+.withPartitionFilter(
+PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+fileStore.partitionType()))
+.plan();
+
+Preconditions.checkState(
+plan.snapshotId() != null && !plan.files().isEmpty(),
+"The specified %s to compact does not exist any snapshot",
+catalogPartitionSpec.getPartitionSpec().isEmpty()
+? "table"
+: String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+Map>> groupBy = 
plan.groupByPartFiles();
+if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key( {
+groupBy =
+pickManifest(
+groupBy,
+new 
FileStoreOptions(Configuration.fromMap(newOptions))
+.mergeTreeOptions(),
+new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+}
+try {
+newOptions.put(
+COMPACTION_SCANNED_MANIFEST.key(),
+Base64.getEncoder()
+.encodeToString(
+InstantiationUtil.serializeObject(
+new PartitionedManifestMeta(
+plan.snapshotId(), 
groupBy;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+return newOptions;
+}
+
+@VisibleForTesting
+Map>> pickManifest(
+Map>> groupBy,
+MergeTreeOptions options,
+Comparator keyComparator) {
+Map>> filtered = new 
HashMap<>();
+
+for (Map.Entry>> 
partEntry :
+groupBy.entrySet()) {
+Map> manifests = new HashMap<>();
+for (Map.Entry> bucketEntry :
+partEntry.getValue().entrySet()) {
+List smallFiles =

Review Comment:
   Here we cannot pick small files at random, which would cause the merged 
files min and max to skip the middle data.
   For example:
   inputs: File1(0-10) File2(10-90) File3(90-100)
   merged: File4(0-100) File2(10-90)
   This can lead to results with overlapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543762#comment-17543762
 ] 

Yang Wang commented on FLINK-27834:
---

cc [~Tison] I remember you also have the same idea when fixing FLINK-27746.

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543761#comment-17543761
 ] 

Gyula Fora commented on FLINK-27834:


Maybe this is not a big concern, would love to hear what [~matyas] or 
[~jbusche] think

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543760#comment-17543760
 ] 

Gyula Fora commented on FLINK-27834:


yes, the user can have anything in their local directory by accident

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543759#comment-17543759
 ] 

Yang Wang commented on FLINK-27834:
---

Do you mean we might accidentally bundle the credentials into the image?

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-25865) Support to set restart policy of TaskManager pod for native K8s integration

2022-05-29 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang reassigned FLINK-25865:
-

Assignee: Aitozi

> Support to set restart policy of TaskManager pod for native K8s integration
> ---
>
> Key: FLINK-25865
> URL: https://issues.apache.org/jira/browse/FLINK-25865
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Yang Wang
>Assignee: Aitozi
>Priority: Major
>
> After FLIP-201, Flink's TaskManagers will be able to be restarted without 
> losing its local state. So it is reasonable to make the restart policy[1] of 
> TaskManager pod could be configured.
> The current restart policy is {{{}Never{}}}. Flink will always delete the 
> failed TaskManager pod directly and create a new one instead. This ticket 
> could help to decrease the recovery time of TaskManager failure.
>  
> Please note that the working directory needs to be located in the 
> emptyDir[1], which is retained in different restarts.
>  
> [1]. 
> https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy
> [2]. https://kubernetes.io/docs/concepts/storage/volumes/#emptydir



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543758#comment-17543758
 ] 

Gyula Fora commented on FLINK-27834:


I have bad feelings about this, it might lead to accidentally leaking 
credentials from local working directories.

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27762) Kafka WakeUpException during handling splits changes

2022-05-29 Thread zoucan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543756#comment-17543756
 ] 

zoucan edited comment on FLINK-27762 at 5/30/22 2:50 AM:
-

[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
for handling this exception in that method.
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*


was (Author: JIRAUSER289972):
[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
for handling this exception.
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*

> Kafka WakeUpException during handling splits changes
> 
>
> Key: FLINK-27762
> URL: https://issues.apache.org/jira/browse/FLINK-27762
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.3
>Reporter: zoucan
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when 
> kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at jav

[GitHub] [flink] flinkbot commented on pull request #19843: [hotfix] Fix comment typo in TypeExtractor class

2022-05-29 Thread GitBox


flinkbot commented on PR #19843:
URL: https://github.com/apache/flink/pull/19843#issuecomment-1140628091

   
   ## CI report:
   
   * 056a8e7e0028e39bd5428a8b40b22454e01a0560 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-27762) Kafka WakeUpException during handling splits changes

2022-05-29 Thread zoucan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543756#comment-17543756
 ] 

zoucan edited comment on FLINK-27762 at 5/30/22 2:46 AM:
-

[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
for handling this exception.
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*


was (Author: JIRAUSER289972):
[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
to handling this exception.
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*

> Kafka WakeUpException during handling splits changes
> 
>
> Key: FLINK-27762
> URL: https://issues.apache.org/jira/browse/FLINK-27762
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.3
>Reporter: zoucan
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when 
> kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurren

[jira] [Comment Edited] (FLINK-27762) Kafka WakeUpException during handling splits changes

2022-05-29 Thread zoucan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543756#comment-17543756
 ] 

zoucan edited comment on FLINK-27762 at 5/30/22 2:45 AM:
-

[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
to handling this exception.
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*


was (Author: JIRAUSER289972):
[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
to handling this exception.

 
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
 

Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*

> Kafka WakeUpException during handling splits changes
> 
>
> Key: FLINK-27762
> URL: https://issues.apache.org/jira/browse/FLINK-27762
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.3
>Reporter: zoucan
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when 
> kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.conc

[GitHub] [flink] deadwind4 opened a new pull request, #19843: [hotfix] Fix comment typo in TypeExtractor class

2022-05-29 Thread GitBox


deadwind4 opened a new pull request, #19843:
URL: https://github.com/apache/flink/pull/19843

   Fix comment typo in TypeExtractor class. Add a `the` before `highest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884378698


##
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
 }
 }
 
+@Test
+public void testOnCompactTableForNoSnapshot() {
+RowType partType = RowType.of();
+MockTableStoreManagedFactory mockTableStoreManagedFactory =
+new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE);
+prepare(
+TABLE + "_" + UUID.randomUUID(),
+partType,
+NON_PARTITIONED_ROW_TYPE,
+NON_PARTITIONED,
+true);
+assertThatThrownBy(
+() ->
+mockTableStoreManagedFactory.onCompactTable(
+context, new 
CatalogPartitionSpec(emptyMap(
+.isInstanceOf(IllegalStateException.class)
+.hasMessageContaining("The specified table to compact does not 
exist any snapshot");
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) 
throws Exception {
+RowType partType = RowType.of();
+runTest(
+new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE),
+TABLE + "_" + UUID.randomUUID(),
+partType,
+NON_PARTITIONED_ROW_TYPE,
+NON_PARTITIONED,
+rescaleBucket);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) 
throws Exception {
+runTest(
+new MockTableStoreManagedFactory(
+SINGLE_PARTITIONED_PART_TYPE, 
SINGLE_PARTITIONED_ROW_TYPE),
+TABLE + "_" + UUID.randomUUID(),
+SINGLE_PARTITIONED_PART_TYPE,
+SINGLE_PARTITIONED_ROW_TYPE,
+SINGLE_PARTITIONED,
+rescaleBucket);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) 
throws Exception {
+runTest(
+new MockTableStoreManagedFactory(),
+TABLE + "_" + UUID.randomUUID(),
+DEFAULT_PART_TYPE,
+DEFAULT_ROW_TYPE,
+MULTI_PARTITIONED,
+rescaleBucket);
+}
+
+@MethodSource("provideManifest")
+@ParameterizedTest

Review Comment:
   Can we have a name for each parameter?
   Very difficult to maintain without a name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27762) Kafka WakeUpException during handling splits changes

2022-05-29 Thread zoucan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543756#comment-17543756
 ] 

zoucan commented on FLINK-27762:


[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
to handling this exception.

 
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
 

 

Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*

 

 

 

> Kafka WakeUpException during handling splits changes
> 
>
> Key: FLINK-27762
> URL: https://issues.apache.org/jira/browse/FLINK-27762
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.3
>Reporter: zoucan
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when 
> kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   ... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartiti

[jira] [Comment Edited] (FLINK-27762) Kafka WakeUpException during handling splits changes

2022-05-29 Thread zoucan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543756#comment-17543756
 ] 

zoucan edited comment on FLINK-27762 at 5/30/22 2:44 AM:
-

[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
to handling this exception.

 
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
 

Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*


was (Author: JIRAUSER289972):
[~renqs] 

Thanks for your effort for this issue.

I‘ve reviewed your pr and i have a question. The exception i met is in method 
{*}KafkaPartitionSplitReader#removeEmptySplits{*}. But i can't find any action 
to handling this exception.

 
{code:java}
// 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader#removeEmptySplits

private void removeEmptySplits() {
List emptyPartitions = new ArrayList<>();
   
for (TopicPartition tp : consumer.assignment()) {
// WakeUpException is thrown here.
// since KafkaConsumer#wakeUp is called before,if execute 
KafkaConsumer#postion() in 'if statement' above, it will throw WakeUpException. 
  
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}

   // ignore irrelevant code...     
} {code}
 

 

Maybe we should check whether *KafkaConsumer#wakeUp* is called before, or catch 
WakeUpException in *KafkaPartitionSplitReader#removeEmptySplits.*

 

 

 

> Kafka WakeUpException during handling splits changes
> 
>
> Key: FLINK-27762
> URL: https://issues.apache.org/jira/browse/FLINK-27762
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.3
>Reporter: zoucan
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when 
> kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884379183


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
 WriteMode writeMode,
 RowType keyType,
 RowType valueType,
-Comparator keyComparator,
+Supplier> keyComparatorSupplier,

Review Comment:
   Why a util method will modify `Comparator` to `Supplier`?
   We shouldn't create it repeatedly unless there is a thread safety issue here.



##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
 @Override
 public Map onCompactTable(
 Context context, CatalogPartitionSpec catalogPartitionSpec) {
-throw new UnsupportedOperationException("Not implement yet");
+Map newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+FileStore fileStore = buildTableStore(context).buildFileStore();
+FileStoreScan.Plan plan =
+fileStore
+.newScan()
+.withPartitionFilter(
+PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+fileStore.partitionType()))
+.plan();
+
+Preconditions.checkState(
+plan.snapshotId() != null && !plan.files().isEmpty(),
+"The specified %s to compact does not exist any snapshot",
+catalogPartitionSpec.getPartitionSpec().isEmpty()
+? "table"
+: String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+Map>> groupBy = 
plan.groupByPartFiles();
+if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key( {
+groupBy =
+pickManifest(
+groupBy,
+new 
FileStoreOptions(Configuration.fromMap(newOptions))
+.mergeTreeOptions(),
+new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+}
+try {
+newOptions.put(
+COMPACTION_SCANNED_MANIFEST.key(),
+Base64.getEncoder()
+.encodeToString(
+InstantiationUtil.serializeObject(
+new PartitionedManifestMeta(
+plan.snapshotId(), 
groupBy;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+return newOptions;
+}
+
+@VisibleForTesting
+Map>> pickManifest(
+Map>> groupBy,
+MergeTreeOptions options,
+Comparator keyComparator) {
+Map>> filtered = new 
HashMap<>();
+
+for (Map.Entry>> 
partEntry :
+groupBy.entrySet()) {
+Map> manifests = new HashMap<>();
+for (Map.Entry> bucketEntry :
+partEntry.getValue().entrySet()) {
+List smallFiles =
+bucketEntry.getValue().stream()
+.filter(fileMeta -> fileMeta.fileSize() < 
options.targetFileSize)
+.collect(Collectors.toList());
+List intersectedFiles =
+new IntervalPartition(bucketEntry.getValue(), 
keyComparator)
+.partition().stream()
+.filter(section -> section.size() > 1)
+.flatMap(Collection::stream)
+.map(SortedRun::files)
+.flatMap(Collection::stream)
+.collect(Collectors.toList());
+
+List filteredFiles =

Review Comment:
   `bucketFiles`?



##
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
 }
 }
 
+@Test
+public void testOnCompactTableForNoSnapshot() {
+RowType partType = RowType.of();
+MockTableStoreManagedFactory mockTableStoreManagedFactory =
+new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE);
+prepare(
+TABLE + "_" + UUID.randomUUID(),
+partType,
+NON_PARTITIONED_ROW_TYPE,
+NON_PARTITIONED,
+ 

[jira] [Commented] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543753#comment-17543753
 ] 

Yang Wang commented on FLINK-27834:
---

[~gyfora] Given that the copied directory is only used for ephemeral maven 
build, do you have any concern on this?

> Flink kubernetes operator dockerfile could not work with podman
> ---
>
> Key: FLINK-27834
> URL: https://issues.apache.org/jira/browse/FLINK-27834
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Priority: Major
>
> [1/2] STEP 16/19: COPY *.git ./.git
> Error: error building at STEP "COPY *.git ./.git": checking on sources under 
> "/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't 
> make  relative to 
> /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; copier: stat: 
> ["/*.git"]: no such file or directory
>  
> podman version
> Client:       Podman Engine
> Version:      4.0.2
> API Version:  4.0.2
>  
>  
> I think the root cause is "*.git" is not respected by podman. Maybe we could 
> simply copy the whole directory when building the image.
>  
> {code:java}
> WORKDIR /app
> COPY . .
> RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
> !flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27834) Flink kubernetes operator dockerfile could not work with podman

2022-05-29 Thread Yang Wang (Jira)
Yang Wang created FLINK-27834:
-

 Summary: Flink kubernetes operator dockerfile could not work with 
podman
 Key: FLINK-27834
 URL: https://issues.apache.org/jira/browse/FLINK-27834
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Yang Wang


[1/2] STEP 16/19: COPY *.git ./.git

Error: error building at STEP "COPY *.git ./.git": checking on sources under 
"/root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0": Rel: can't make  
relative to /root/FLINK/release-1.0-rc2/flink-kubernetes-operator-1.0.0; 
copier: stat: ["/*.git"]: no such file or directory

 

podman version
Client:       Podman Engine
Version:      4.0.2
API Version:  4.0.2

 

 

I think the root cause is "*.git" is not respected by podman. Maybe we could 
simply copy the whole directory when building the image.

 
{code:java}
WORKDIR /app

COPY . .

RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
!flink-kubernetes-docs -DskipTests=$SKIP_TESTS {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

2022-05-29 Thread GitBox


tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r884370488


##
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {
+
+@Override
+protected List ddl() {
+
+String ddl1 =
+"CREATE TABLE IF NOT EXISTS T3 ( "
++ " a STRING, "
++ " b BIGINT, "
++ " c INT, "
++ " PRIMARY KEY (a) NOT ENFORCED )"
++ " WITH ("
++ " 'merge-engine'='aggregation' ,"
++ " 'b.aggregate-function'='sum' ,"
++ " 'c.aggregate-function'='sum' "
++ " );";
+String ddl2 =
+"CREATE TABLE IF NOT EXISTS T4 ( "
++ " a STRING,"
++ " b INT,"
++ " c DOUBLE,"
++ " PRIMARY KEY (a, b) NOT ENFORCED )"
++ " WITH ("
++ " 'merge-engine'='aggregation',"
++ " 'c.aggregate-function' = 'sum'"
++ " );";
+String ddl3 =
+"CREATE TABLE IF NOT EXISTS T5 ( "
++ " a STRING,"
++ " b INT,"
++ " c DOUBLE,"
++ " PRIMARY KEY (a) NOT ENFORCED )"
++ " WITH ("
++ " 'merge-engine'='aggregation',"
++ " 'b.aggregate-function' = 'sum'"
++ " );";
+List lists = new ArrayList<>();
+lists.add(ddl1);
+lists.add(ddl2);
+lists.add(ddl3);
+return lists;
+}
+
+@Test
+public void testCreateAggregateFunction() throws ExecutionException, 
InterruptedException {
+List result;
+
+// T5
+try {
+bEnv.executeSql("INSERT INTO T5 VALUES " + "('pk1',1, 2.0), " + 
"('pk1',1, 2.0)")
+.await();
+throw new AssertionError("create table T5 should failed");
+} catch (IllegalArgumentException e) {
+assert ("should  set aggregate function for every column not part 
of primary key"
+.equals(e.getLocalizedMessage()));
+}
+}
+
+@Test
+public void testMergeInMemory() throws ExecutionException, 
InterruptedException {
+List result;
+// T3
+bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + 
"('pk1',1, 2)").await();
+result = iteratorToList(bEnv.from("T3").execute().collect());
+assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2L, 4));
+
+// T4
+bEnv.executeSql("INSERT INTO T4 VALUES " + "('pk1',1, 2.0), " + 
"('pk1',1, 2.0)").await();
+result = iteratorToList(bEnv.from("T4").execute().collect());
+assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 1, 4.0));
+}
+
+@Test
+public void testMergeRead() throws ExecutionException, 
InterruptedException {
+List result;
+// T3
+bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+result = iteratorToList(bEnv.from("T3").execute().collect());
+assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4L, 6));
+
+// T4
+bEnv.executeSql("INSERT INTO T4 VALUES ('pk1',1, 2.0)").await();
+bEnv.executeSql("INSERT I

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

2022-05-29 Thread GitBox


JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884375368


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##
@@ -155,7 +155,7 @@ static Map 
filterLogStoreOptions(Map options) {
 Map.Entry::getValue));
 }
 
-static TableStore buildTableStore(DynamicTableFactory.Context context) {

Review Comment:
   Add document



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

2022-05-29 Thread GitBox


Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884374610


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
 private final Map jobMasterTargetAddresses = new 
HashMap<>();
 private final Map pendingSlotAllocations;
 
+/** Delay of the requirement change check in the slot manager. */
+private final Duration requirementsCheckDelay;

Review Comment:
   Get it, will do it 



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
 private final Map jobMasterTargetAddresses = new 
HashMap<>();
 private final Map pendingSlotAllocations;
 
+/** Delay of the requirement change check in the slot manager. */
+private final Duration requirementsCheckDelay;

Review Comment:
   do you mean create another ticket for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26179) Support periodic savepointing in the operator

2022-05-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543748#comment-17543748
 ] 

Gyula Fora commented on FLINK-26179:


I think the question of configurable native/canonical savepoint format is not 
specific to this feature. It also applies to upgrades and manual triggering.

Users should naturally not configure small periodic savepoint interval, but 
otherwise it's completely up to them.

> Support periodic savepointing in the operator
> -
>
> Key: FLINK-26179
> URL: https://issues.apache.org/jira/browse/FLINK-26179
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> Automatic triggering of savepoints is a commonly requested feature.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26179) Support periodic savepointing in the operator

2022-05-29 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543744#comment-17543744
 ] 

Yang Wang commented on FLINK-26179:
---

It is harmless to introduce such a new feature. Do we need to support the 
NATIVE savepoint format type?

If not, we should notice the users taking savepoint with CANONICAL is 
expensive. It should not be configured as small as checkpoint interval.

> Support periodic savepointing in the operator
> -
>
> Key: FLINK-26179
> URL: https://issues.apache.org/jira/browse/FLINK-26179
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> Automatic triggering of savepoints is a commonly requested feature.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27833) PulsarSourceITCase.testTaskManagerFailure failed with AssertionError

2022-05-29 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27833:


 Summary: PulsarSourceITCase.testTaskManagerFailure failed with 
AssertionError
 Key: FLINK-27833
 URL: https://issues.apache.org/jira/browse/FLINK-27833
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.14.4
Reporter: Huang Xingbo



{code:java}
2022-05-28T02:40:24.3137097Z May 28 02:40:24 [ERROR] Tests run: 8, Failures: 1, 
Errors: 0, Skipped: 0, Time elapsed: 123.914 s <<< FAILURE! - in 
org.apache.flink.connector.pulsar.source.PulsarSourceITCase
2022-05-28T02:40:24.3139274Z May 28 02:40:24 [ERROR] 
testTaskManagerFailure{TestEnvironment, ExternalContext, 
ClusterControllable}[2]  Time elapsed: 25.823 s  <<< FAILURE!
2022-05-28T02:40:24.3140450Z May 28 02:40:24 java.lang.AssertionError: 
2022-05-28T02:40:24.3141062Z May 28 02:40:24 
2022-05-28T02:40:24.3141875Z May 28 02:40:24 Expected: Records consumed by 
Flink should be identical to test data and preserve the order in split
2022-05-28T02:40:24.3143710Z May 28 02:40:24  but: Mismatched record at 
position 18: Expected '0-iuKyiUwhhuO3NoDPvpLxIUw3' but was '0-98Y'
2022-05-28T02:40:24.3145019Z May 28 02:40:24at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
2022-05-28T02:40:24.3146064Z May 28 02:40:24at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
2022-05-28T02:40:24.3147334Z May 28 02:40:24at 
org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testTaskManagerFailure(SourceTestSuiteBase.java:289)
2022-05-28T02:40:24.3148548Z May 28 02:40:24at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-05-28T02:40:24.3149690Z May 28 02:40:24at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-05-28T02:40:24.3150910Z May 28 02:40:24at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-05-28T02:40:24.3158249Z May 28 02:40:24at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-05-28T02:40:24.3159819Z May 28 02:40:24at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
2022-05-28T02:40:24.3161222Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
2022-05-28T02:40:24.3162609Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-05-28T02:40:24.3164030Z May 28 02:40:24at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
2022-05-28T02:40:24.3165548Z May 28 02:40:24at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
2022-05-28T02:40:24.3167224Z May 28 02:40:24at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
2022-05-28T02:40:24.3169087Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
2022-05-28T02:40:24.3170540Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
2022-05-28T02:40:24.3171970Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
2022-05-28T02:40:24.3173499Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
2022-05-28T02:40:24.3175023Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
2022-05-28T02:40:24.3176435Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
2022-05-28T02:40:24.3177715Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
2022-05-28T02:40:24.3178985Z May 28 02:40:24at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
2022-05-28T02:40:24.3180563Z May 28 02:40:24at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
2022-05-28T02:40:24.3182004Z May 28 02:40:24at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-05-28T02:40:24.3183400Z May 28 02:40:24at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
2022-05-28T02:40:24.3184865Z May 28 02:40:24at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
2022-05-28T02:40:24.3185720Z May 28 02:40:24at 

[jira] [Commented] (FLINK-27009) Support SQL job submission in flink kubernetes opeartor

2022-05-29 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543743#comment-17543743
 ] 

Yang Wang commented on FLINK-27009:
---

[~mbalassi] Maybe you could also share some use cases from your internal users. 
For example, how would you they like to submit a SQL job?

By introducing a new field named with {{SQLScriptURI}} or they prefer a pure 
raw field {{{}SQLScript{}}}. After then we could integrate them into the 
proposal.

> Support SQL job submission in flink kubernetes opeartor
> ---
>
> Key: FLINK-27009
> URL: https://issues.apache.org/jira/browse/FLINK-27009
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>
> Currently, the flink kubernetes opeartor is for jar job using application or 
> session cluster. For SQL job, there is no out of box solution in the 
> operator.  
> One simple and short-term solution is to wrap the SQL script into a jar job 
> using table API with limitation.
> The long-term solution may work with 
> [FLINK-26541|https://issues.apache.org/jira/browse/FLINK-26541] to achieve 
> the full support.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] JingsongLi commented on pull request #133: [FLINK-27502] Add file Suffix to data files

2022-05-29 Thread GitBox


JingsongLi commented on PR #133:
URL: 
https://github.com/apache/flink-table-store/pull/133#issuecomment-1140603513

   ```
   Error:  Failures: 
   Error:DataFilePathFactoryTest.testNoPartition:46 
   expected: 
/tmp/junit13044672734532131/bucket-123/data-de33b78b-2802-49a0-8ec7-cbeeed7af049-0
but was: 
/tmp/junit13044672734532131/bucket-123/data-de33b78b-2802-49a0-8ec7-cbeeed7af049-0.orc
   Error:DataFilePathFactoryTest.testWithPartition:64 
   expected: 
/tmp/junit3939205851475787579/dt=20211224/bucket-123/data-8a204f50-ad09-4e55-a41f-39c94da0863c-0
but was: 
/tmp/junit3939205851475787579/dt=20211224/bucket-123/data-8a204f50-ad09-4e55-a41f-39c94da0863c-0.orc
   [INFO] 
   ```
   
   Test failure, you need to adjust some testing codes too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27832) SplitAggregateITCase tests failed with Could not acquire the minimum required resources

2022-05-29 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27832:


 Summary: SplitAggregateITCase tests failed with Could not acquire 
the minimum required resources
 Key: FLINK-27832
 URL: https://issues.apache.org/jira/browse/FLINK-27832
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-05-27T17:42:59.4814999Z May 27 17:42:59 [ERROR] Tests run: 64, Failures: 
23, Errors: 1, Skipped: 0, Time elapsed: 305.5 s <<< FAILURE! - in 
org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase
2022-05-27T17:42:59.4815983Z May 27 17:42:59 [ERROR] 
SplitAggregateITCase.testAggWithJoin  Time elapsed: 278.742 s  <<< ERROR!
2022-05-27T17:42:59.4816608Z May 27 17:42:59 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
2022-05-27T17:42:59.4819182Z May 27 17:42:59at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
2022-05-27T17:42:59.4820363Z May 27 17:42:59at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
2022-05-27T17:42:59.4821463Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
2022-05-27T17:42:59.4822292Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
2022-05-27T17:42:59.4823317Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2022-05-27T17:42:59.4824210Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2022-05-27T17:42:59.4825081Z May 27 17:42:59at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
2022-05-27T17:42:59.4825927Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
2022-05-27T17:42:59.4826748Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
2022-05-27T17:42:59.4827596Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2022-05-27T17:42:59.4828416Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2022-05-27T17:42:59.4829284Z May 27 17:42:59at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
2022-05-27T17:42:59.4830111Z May 27 17:42:59at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
2022-05-27T17:42:59.4831015Z May 27 17:42:59at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
2022-05-27T17:42:59.483Z May 27 17:42:59at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
2022-05-27T17:42:59.4833162Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
2022-05-27T17:42:59.4834250Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
2022-05-27T17:42:59.4835236Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2022-05-27T17:42:59.4836035Z May 27 17:42:59at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2022-05-27T17:42:59.4836872Z May 27 17:42:59at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
2022-05-27T17:42:59.4837630Z May 27 17:42:59at 
akka.dispatch.OnComplete.internal(Future.scala:300)
2022-05-27T17:42:59.4838394Z May 27 17:42:59at 
akka.dispatch.OnComplete.internal(Future.scala:297)
2022-05-27T17:42:59.4839044Z May 27 17:42:59at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
2022-05-27T17:42:59.4839748Z May 27 17:42:59at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
2022-05-27T17:42:59.4840463Z May 27 17:42:59at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
2022-05-27T17:42:59.4841313Z May 27 17:42:59at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
2022-05-27T17:42:59.4842335Z May 27 17:42:59at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
2022-05-27T17:42:59.4843300Z May 27 17:42:59at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
2022-05-27T17:42:59.4844146Z May 27 17:42:59at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
2022-05-27T17:42:59.4844882Z May 27 17:42:59at 
scala.concurrent.impl.Promise$DefaultPromise.t

[jira] [Updated] (FLINK-27625) Add query hint for async lookup join

2022-05-29 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-27625:

Description: 
The hint name discuss thread: 
https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8

FLINK-27623 adds a global parameter 'table.exec.async-lookup.output-mode' for 
table users so that all three control parameters related to async I/O can be 
configured at the same job level.
As planned in the issue, we‘d like to go a step further to offer more precise 
control for async join operation more than job level config, to introduce a new 
join hint: ‘ASYNC_LOOKUP’.

For the hint option, for intuitive and user-friendly reasons, we want to 
support both simple and kv forms, with all options except table name being 
optional (use job level configuration if not set)

# 1. simple form: (ordered hint option list)
```
ASYNC_LOOKUP('tableName'[, 'output-mode', 'buffer-capacity', 'timeout'])
optional:
output-mode
buffer-capacity
timeout
```

Note: since Calcite currently does not support the mixed type hint options,
the table name here needs to be a string instead of an identifier. (For
`SqlHint`: The option format can not be mixed in, they should either be all
simple identifiers or all literals or all key value pairs.) We can improve
this after Calcite support.

# 2. kv form: (support unordered hint option list)
```
ASYNC_LOOKUP('table'='tableName'[, 'output-mode'='ordered|allow-unordered',
'capacity'='int', 'timeout'='duration'])

optional kvs:
'output-mode'='ordered|allow-unordered'
'capacity'='int'
'timeout'='duration'
```

e.g., if the job level configuration is:
```
table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s
```

then the following hints:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered')
4. ASYNC_LOOKUP('table'='dim1', 'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'capacity'='300')
```

are equivalent to:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '180s')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered',
'capacity'='100', 'timeout'='180s')
4. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity'='100',
'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity'='300',
'timeout'='180s')
```

In addition, if the lookup source implements both sync and async table
function, the planner prefers to choose the async function when the
'ASYNC_LOOKUP' hint is specified.




  was:
Add query hint for async lookup join for join level control:
e.g., 
{code}
// ordered mode
ASYNC_LOOKUP(dim1, 'ordered', '100', '180s')
// unordered mode
ASYNC_LOOKUP(dim1, 'allow-unordered', '100', '180s')
{code}

TODO: The hint name should be discussed in ML.


> Add query hint for async lookup join
> 
>
> Key: FLINK-27625
> URL: https://issues.apache.org/jira/browse/FLINK-27625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
> Fix For: 1.16.0
>
>
> The hint name discuss thread: 
> https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8
> FLINK-27623 adds a global parameter 'table.exec.async-lookup.output-mode' for 
> table users so that all three control parameters related to async I/O can be 
> configured at the same job level.
> As planned in the issue, we‘d like to go a step further to offer more precise 
> control for async join operation more than job level config, to introduce a 
> new join hint: ‘ASYNC_LOOKUP’.
> For the hint option, for intuitive and user-friendly reasons, we want to 
> support both simple and kv forms, with all options except table name being 
> optional (use job level configuration if not set)
> # 1. simple form: (ordered hint option list)
> ```
> ASYNC_LOOKUP('tableName'[, 'output-mode', 'buffer-capacity', 'timeout'])
> optional:
> output-mode
> buffer-capacity
> timeout
> ```
> Note: since Calcite currently does not support the mixed type hint options,
> the table name here needs to be a string instead of an identifier. (For
> `SqlHint`: The option format can not be mixed in, they should either be all
> simple identifiers or all literals or all key value pairs.) We can improve
> this after Calcite support.
> # 2. kv form: (support unordered hint option list)
> ```
> ASYNC_LOOKUP('table'='tableName'[, 'output-mode'='ordered|allow-unordered',
> 'capacity'='int', 'timeout'='duration'])
> optional kvs:
> 'output-mode'='ordered|allow-unordered'
> 'capacity'='int'
> 'timeout'='duration'
> ```
> e.g., if the job level configuration is:
>

[jira] [Updated] (FLINK-27625) Add query hint for async lookup join

2022-05-29 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-27625:

Description: 
The hint name discuss thread: 
https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8

FLINK-27623 adds a global parameter 'table.exec.async-lookup.output-mode' for 
table users so that all three control parameters related to async I/O can be 
configured at the same job level.
As planned in the issue, we‘d like to go a step further to offer more precise 
control for async join operation more than job level config, to introduce a new 
join hint: ‘ASYNC_LOOKUP’.

For the hint option, for intuitive and user-friendly reasons, we want to 
support both simple and kv forms, with all options except table name being 
optional (use job level configuration if not set)

1. simple form: (ordered hint option list)
```
ASYNC_LOOKUP('tableName'[, 'output-mode', 'buffer-capacity', 'timeout'])
optional:
output-mode
buffer-capacity
timeout
```

Note: since Calcite currently does not support the mixed type hint options,
the table name here needs to be a string instead of an identifier. (For
`SqlHint`: The option format can not be mixed in, they should either be all
simple identifiers or all literals or all key value pairs.) We can improve
this after Calcite support.

2. kv form: (support unordered hint option list)
```
ASYNC_LOOKUP('table'='tableName'[, 'output-mode'='ordered|allow-unordered',
'capacity'='int', 'timeout'='duration'])

optional kvs:
'output-mode'='ordered|allow-unordered'
'capacity'='int'
'timeout'='duration'
```

e.g., if the job level configuration is:
```
table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s
```

then the following hints:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered')
4. ASYNC_LOOKUP('table'='dim1', 'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'capacity'='300')
```

are equivalent to:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '180s')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered',
'capacity'='100', 'timeout'='180s')
4. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity'='100',
'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity'='300',
'timeout'='180s')
```

In addition, if the lookup source implements both sync and async table
function, the planner prefers to choose the async function when the
'ASYNC_LOOKUP' hint is specified.




  was:
The hint name discuss thread: 
https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8

FLINK-27623 adds a global parameter 'table.exec.async-lookup.output-mode' for 
table users so that all three control parameters related to async I/O can be 
configured at the same job level.
As planned in the issue, we‘d like to go a step further to offer more precise 
control for async join operation more than job level config, to introduce a new 
join hint: ‘ASYNC_LOOKUP’.

For the hint option, for intuitive and user-friendly reasons, we want to 
support both simple and kv forms, with all options except table name being 
optional (use job level configuration if not set)

# 1. simple form: (ordered hint option list)
```
ASYNC_LOOKUP('tableName'[, 'output-mode', 'buffer-capacity', 'timeout'])
optional:
output-mode
buffer-capacity
timeout
```

Note: since Calcite currently does not support the mixed type hint options,
the table name here needs to be a string instead of an identifier. (For
`SqlHint`: The option format can not be mixed in, they should either be all
simple identifiers or all literals or all key value pairs.) We can improve
this after Calcite support.

# 2. kv form: (support unordered hint option list)
```
ASYNC_LOOKUP('table'='tableName'[, 'output-mode'='ordered|allow-unordered',
'capacity'='int', 'timeout'='duration'])

optional kvs:
'output-mode'='ordered|allow-unordered'
'capacity'='int'
'timeout'='duration'
```

e.g., if the job level configuration is:
```
table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s
```

then the following hints:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered')
4. ASYNC_LOOKUP('table'='dim1', 'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'capacity'='300')
```

are equivalent to:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '180s')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered',
'capacity'='100', 'timeout'='180s')
4. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity

[GitHub] [flink] flinkbot commented on pull request #19842: [FLINK-27522][network] Ignore max buffers per channel when allocate buffer

2022-05-29 Thread GitBox


flinkbot commented on PR #19842:
URL: https://github.com/apache/flink/pull/19842#issuecomment-1140596253

   
   ## CI report:
   
   * aa380f29ca66553b6f168a177dcedb4563efb05a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27826) Support machine learning training for very high dimesional models

2022-05-29 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-27826:
--
Description: 
There is limited support for training high dimensional machine learning models 
in FlinkML though it is often useful, especially in industrial cases. When the 
size of the model parameter can not be hold in the memory of a single machine, 
FlinkML crashes now.

So it would be nice if we support high dimensional model training in FlinkML. 
To achieve this, we probably need to do the following things:
 # Do a survey on how to training large machine learning models of existing 
machine learning systems (e.g. data paralllel, model parallel).
 # Define/Implement the infra of supporting large model training in FlinkML.
 # Implement a machine learning model (e.g., logisitic regression, factorzation 
machine, etc) that can train models with more than ten billion parameters.
 # Benchmark the implementation and further improve it.

  was:
There is limited support for training high dimensional machine learning models 
in FlinkML though it is often useful, especially in industrial cases. When the 
size of the model parameter can not be hold in the memory of a single machine, 
FlinkML crashes now.

So it would be nice if we support high dimensional model training in FlinkML. 
To achieve this, we probably need to do the following things:
 # Do a survey on how to training large machine learning models of existing 
machine learning systems (e.g. data paralllel, model parallel).
 # Define/Implement the infra of supporting large model training in FlinkML.
 # Implement a logistic regression model that can train models with more than 
ten billion parameters.
 # Benchmark the implementation and further improve it.


> Support machine learning training for very high dimesional models
> -
>
> Key: FLINK-27826
> URL: https://issues.apache.org/jira/browse/FLINK-27826
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>
> There is limited support for training high dimensional machine learning 
> models in FlinkML though it is often useful, especially in industrial cases. 
> When the size of the model parameter can not be hold in the memory of a 
> single machine, FlinkML crashes now.
> So it would be nice if we support high dimensional model training in FlinkML. 
> To achieve this, we probably need to do the following things:
>  # Do a survey on how to training large machine learning models of existing 
> machine learning systems (e.g. data paralllel, model parallel).
>  # Define/Implement the infra of supporting large model training in FlinkML.
>  # Implement a machine learning model (e.g., logisitic regression, 
> factorzation machine, etc) that can train models with more than ten billion 
> parameters.
>  # Benchmark the implementation and further improve it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27522) Ignore max buffers per channel when allocate buffer

2022-05-29 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27522:
---
Labels: pull-request-available  (was: )

> Ignore max buffers per channel when allocate buffer
> ---
>
> Key: FLINK-27522
> URL: https://issues.apache.org/jira/browse/FLINK-27522
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.14.0, 1.15.0
>Reporter: fanrui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> This is first task of  
> [FLIP-227|https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer]
> The LocalBufferPool will be unavailable when the maxBuffersPerChannel is 
> reached for this channel or availableMemorySegments.isEmpty.
> If we request a memory segment from LocalBufferPool and the 
> maxBuffersPerChannel is reached for this channel, we just ignore that and 
> continue to allocate buffer while availableMemorySegments isn't empty in 
> LocalBufferPool.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] 1996fanrui opened a new pull request, #19842: [FLINK-27522][network] Ignore max buffers per channel when allocate buffer

2022-05-29 Thread GitBox


1996fanrui opened a new pull request, #19842:
URL: https://github.com/apache/flink/pull/19842

   ## What is the purpose of the change
   
   Ignore max buffers per channel when allocate buffer
   
   ## Brief change log
   
   Ignore max buffers per channel when allocate buffer
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`LocalBufferPoolTest#testMaxBuffersPerChannelAndAvailability`.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27831) Provide example of Beam on the k8s operator

2022-05-29 Thread Jira
Márton Balassi created FLINK-27831:
--

 Summary: Provide example of Beam on the k8s operator
 Key: FLINK-27831
 URL: https://issues.apache.org/jira/browse/FLINK-27831
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Márton Balassi
Assignee: Márton Balassi
 Fix For: kubernetes-operator-1.1.0


Multiple users have asked for whether the operator supports Beam jobs in 
different shapes. I assume that running a Beam job ultimately with the current 
operator ultimately comes down to having the right jars on the classpath / 
packaged into the user's fatjar.

At this stage I suggest adding one such example, providing it might attract new 
users.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

2022-05-29 Thread GitBox


KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884357127


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
 private final Map jobMasterTargetAddresses = new 
HashMap<>();
 private final Map pendingSlotAllocations;
 
+/** Delay of the requirement change check in the slot manager. */
+private final Duration requirementsCheckDelay;

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27827) StreamExecutionEnvironment method supporting explicit Boundedness

2022-05-29 Thread Andreas Hailu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas Hailu updated FLINK-27827:
--
Description: 
When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
This is results in runtime exceptions when trying to run applications in Batch 
execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache Iceberg 
[1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...

I understand there is a {{DataSource}} API [3] that supports the specification 
of the boundedness of an input, but that would require all connectors to revise 
their APIs to leverage it which would take some time.

My organization is in the middle of migrating from the {{DataSet}} API to the 
{{{}DataStream API{}}}, and we've found this to be a challenge as nearly all of 
our inputs have been determined to be unbounded as we use {{InputFormats}} that 
are not {{{}FileInputFormat{}}}s.

Our work-around was to provide a local patch in {{StreamExecutionEnvironment}} 
with a method supporting explicitly bounded inputs.

As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
something that may be helpful for others?

 

[1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
 

[3] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
 

  was:
When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
This is results in runtime exceptions when trying to run applications in Batch 
execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache Iceberg 
[1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...

I understand there is a {{DataSource}} API [3] that supports the specification 
of the boundedness of an input, but that would require all connectors to revise 
their APIs to leverage it which would take some time.

My organization is in the middle of migrating from the {{DataSet}} API to the 
{{{}DataStream API{}}}, and we've found this to be a challenge as nearly all of 
our inputs have been determined to be unbounded as we use{{ InputFormats}} that 
are not {{{}FileInputFormat{}}}s.

Our work-around was to provide a local patch in {{StreamExecutionEnvironment}} 
with a method supporting explicitly bounded inputs.

As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
something that may be helpful for others?

 

[1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
 

[3] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
 


> StreamExecutionEnvironment method supporting explicit Boundedness
> -
>
> Key: FLINK-27827
> URL: https://issues.apache.org/jira/browse/FLINK-27827
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Andreas Hailu
>Priority: Minor
>
> When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
> returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
> This is results in runtime exceptions when trying to run applications in 
> Batch execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache 
> Iceberg [1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...
> I understand there is a {{DataSource}} API [3] that supports the 
> specification of the boundedness of an input, but that would require all 
> connectors to revise their APIs to leverage it which would take some time.
> My organization is in the middle of migrating from the {{DataSet}} API to the 
> {{{}DataStream API{}}}, and we've found this to be a challenge as nearly all 
> of our inputs have been determined to be unbounded as we use {{InputFormats}} 
> that are not {{{}FileInputFormat{}}}s.
> Our work-around was to provide a local patch in 
> {{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
> inputs.
> As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
> something that may be helpful for others?
>  
> [1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]
> [2] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
>  
> [3] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
>  



--
This mes

[jira] [Updated] (FLINK-27827) StreamExecutionEnvironment method supporting explicit Boundedness

2022-05-29 Thread Andreas Hailu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas Hailu updated FLINK-27827:
--
Description: 
When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
This is results in runtime exceptions when trying to run applications in Batch 
execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache Iceberg 
[1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...

I understand there is a {{DataSource}} API [3] that supports the specification 
of the boundedness of an input, but that would require all connectors to revise 
their APIs to leverage it which would take some time.

My organization is in the middle of migrating from the {{DataSet}} API to the 
{{{}DataStream API{}}}, and we've found this to be a challenge as nearly all of 
our inputs have been determined to be unbounded as we use{{ InputFormats}} that 
are not {{{}FileInputFormat{}}}s.

Our work-around was to provide a local patch in {{StreamExecutionEnvironment}} 
with a method supporting explicitly bounded inputs.

As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
something that may be helpful for others?

 

[1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
 

[3] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
 

  was:
When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
This is results in runtime exceptions when trying to run applications in Batch 
execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache Iceberg 
[1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...

I understand there is a {{DataSource}} API [3] that supports the specification 
of the boundedness of an input, but that would require all connectors to revise 
their APIs to leverage it which would take some time.

My organization is in the middle of migrating from the {{DataSet}} API to the 
{{DataStream API, and we've found this to be a challenge as nearly all of our 
inputs have been determines to be unbounded as we use InputFormats}} that are 
not {{{}FileInputFormat{}}}s. Our work-around was to provide a local patch in 
{{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
inputs.

As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
something that may be helpful for others?

 

[1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
 

[3] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
 


> StreamExecutionEnvironment method supporting explicit Boundedness
> -
>
> Key: FLINK-27827
> URL: https://issues.apache.org/jira/browse/FLINK-27827
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Andreas Hailu
>Priority: Minor
>
> When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
> returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
> This is results in runtime exceptions when trying to run applications in 
> Batch execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache 
> Iceberg [1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...
> I understand there is a {{DataSource}} API [3] that supports the 
> specification of the boundedness of an input, but that would require all 
> connectors to revise their APIs to leverage it which would take some time.
> My organization is in the middle of migrating from the {{DataSet}} API to the 
> {{{}DataStream API{}}}, and we've found this to be a challenge as nearly all 
> of our inputs have been determined to be unbounded as we use{{ InputFormats}} 
> that are not {{{}FileInputFormat{}}}s.
> Our work-around was to provide a local patch in 
> {{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
> inputs.
> As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
> something that may be helpful for others?
>  
> [1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]
> [2] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
>  
> [3] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
>  



--
This message was

[jira] [Updated] (FLINK-27827) StreamExecutionEnvironment method supporting explicit Boundedness

2022-05-29 Thread Andreas Hailu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas Hailu updated FLINK-27827:
--
Description: 
When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
This is results in runtime exceptions when trying to run applications in Batch 
execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache Iceberg 
[1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...

I understand there is a {{DataSource}} API [3] that supports the specification 
of the boundedness of an input, but that would require all connectors to revise 
their APIs to leverage it which would take some time.

My organization is in the middle of migrating from the {{DataSet}} API to the 
{{DataStream API, and we've found this to be a challenge as nearly all of our 
inputs have been determines to be unbounded as we use InputFormats}} that are 
not {{{}FileInputFormat{}}}s. Our work-around was to provide a local patch in 
{{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
inputs.

As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
something that may be helpful for others?

 

[1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
 

[3] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
 

  was:
When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
This is results in runtime exceptions when trying to run applications in Batch 
execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache Iceberg 
[1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...

I understand there is a {{DataSource}} API [3] that supports the specification 
of the boundedness of an input, but that would require all connectors to revise 
their APIs to leverage it which would take some time.

My organization is in the middle of migrating from the {{DataSet}} API to the 
{{DataStream }}API, and we've found this to be a challenge as nearly all of our 
inputs have been determines to be unbounded as we use {{InputFormats}} that are 
not {{{}FileInputFormat{}}}s. Our work-around was to provide a local patch in 
{{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
inputs.

As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
something that may be helpful for others?

 

[1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
 

[3] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
 


> StreamExecutionEnvironment method supporting explicit Boundedness
> -
>
> Key: FLINK-27827
> URL: https://issues.apache.org/jira/browse/FLINK-27827
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Andreas Hailu
>Priority: Minor
>
> When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
> returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
> This is results in runtime exceptions when trying to run applications in 
> Batch execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache 
> Iceberg [1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...
> I understand there is a {{DataSource}} API [3] that supports the 
> specification of the boundedness of an input, but that would require all 
> connectors to revise their APIs to leverage it which would take some time.
> My organization is in the middle of migrating from the {{DataSet}} API to the 
> {{DataStream API, and we've found this to be a challenge as nearly all of our 
> inputs have been determines to be unbounded as we use InputFormats}} that are 
> not {{{}FileInputFormat{}}}s. Our work-around was to provide a local patch in 
> {{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
> inputs.
> As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
> something that may be helpful for others?
>  
> [1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]
> [2] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
>  
> [3] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
>  



--
This message was sent by Atlas

[jira] [Comment Edited] (FLINK-27593) Parquet/Orc format reader optimization

2022-05-29 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543551#comment-17543551
 ] 

Jing Ge edited comment on FLINK-27593 at 5/29/22 9:13 PM:
--

[~lsy] would you please describe what the issues are and what should be exactly 
done for this task?


was (Author: jingge):
[~lsy] would you please describe what are the issues and what should be exactly 
done for this task?

> Parquet/Orc format reader optimization
> --
>
> Key: FLINK-27593
> URL: https://issues.apache.org/jira/browse/FLINK-27593
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] NicoK closed pull request #10607: [FLINK-15298][docs] Fix dependences in the DataStream API Tutorial doc

2022-05-29 Thread GitBox


NicoK closed pull request #10607: [FLINK-15298][docs] Fix dependences in the 
DataStream API Tutorial doc
URL: https://github.com/apache/flink/pull/10607


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] NicoK commented on pull request #10607: [FLINK-15298][docs] Fix dependences in the DataStream API Tutorial doc

2022-05-29 Thread GitBox


NicoK commented on PR #10607:
URL: https://github.com/apache/flink/pull/10607#issuecomment-1140516269

   sorry for not re-reviewing it earlier - since 1.9 is a bit too old now, let 
me close this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27830) My Pyflink job could not submit to Flink cluster

2022-05-29 Thread TongMeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TongMeng updated FLINK-27830:
-
Environment: 
python 3.6.9

Flink 1.13.0

PyFlink 1.13.0

zookeeper 3.4.14

hadoop 2.10.1

> My Pyflink job could not submit to Flink cluster
> 
>
> Key: FLINK-27830
> URL: https://issues.apache.org/jira/browse/FLINK-27830
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.0
> Environment: python 3.6.9
> Flink 1.13.0
> PyFlink 1.13.0
> zookeeper 3.4.14
> hadoop 2.10.1
>Reporter: TongMeng
>Priority: Major
> Attachments: error.txt
>
>
> I use commd
> {code:java}
> ./flink run --python /home/ubuntu/pyflink/main.py 
> /home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
> /home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
> /home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
> /home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
> /home/ubuntu/pyflink/UDF1.py {code}
> to submit my pyflink job.
> The error happened on:
> {code:java}
> st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink 
> select {taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
> terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
> `timestamp`) from mysource").execute().wait()
> {code}
> My appendix error.txt contains the exceptions. It seems like there is 
> something wrong with Apache Beam.
> When I use python command to run my job (in standalone mode instead of 
> submitting to Flink cluster), it works well.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27830) My Pyflink job could not submit to Flink cluster

2022-05-29 Thread TongMeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TongMeng updated FLINK-27830:
-
Description: 
I use commd
{code:java}
./flink run --python /home/ubuntu/pyflink/main.py 
/home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
/home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
/home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
/home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
/home/ubuntu/pyflink/UDF1.py {code}
to submit my pyflink job.

The error happened on:
{code:java}
st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink select 
{taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
`timestamp`) from mysource").execute().wait()
{code}
My appendix error.txt contains the exceptions. It seems like there is something 
wrong with Apache Beam.

When I use python command to run my job (in standalone mode instead of 
submitting to Flink cluster), it works well.

 

 

 

  was:
I use commd
{code:java}
./flink run --python /home/ubuntu/pyflink/main.py 
/home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
/home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
/home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
/home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
/home/ubuntu/pyflink/UDF1.py {code}
 

to submit my pyflink job.

The error happened on:
{code:java}
st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink select 
{taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
`timestamp`) from mysource").execute().wait()
{code}
My appendix error.txt contains the exceptions. It seems like there is something 
wrong with Apache Beam.

When I use python command to run my job (in standalone mode instead of 
submitting to Flink cluster), it works well.

 

 

 


> My Pyflink job could not submit to Flink cluster
> 
>
> Key: FLINK-27830
> URL: https://issues.apache.org/jira/browse/FLINK-27830
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: TongMeng
>Priority: Major
> Attachments: error.txt
>
>
> I use commd
> {code:java}
> ./flink run --python /home/ubuntu/pyflink/main.py 
> /home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
> /home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
> /home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
> /home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
> /home/ubuntu/pyflink/UDF1.py {code}
> to submit my pyflink job.
> The error happened on:
> {code:java}
> st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink 
> select {taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
> terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
> `timestamp`) from mysource").execute().wait()
> {code}
> My appendix error.txt contains the exceptions. It seems like there is 
> something wrong with Apache Beam.
> When I use python command to run my job (in standalone mode instead of 
> submitting to Flink cluster), it works well.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27830) My Pyflink job could not submit to Flink cluster

2022-05-29 Thread TongMeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TongMeng updated FLINK-27830:
-
Description: 
I use commd
{code:java}
./flink run --python /home/ubuntu/pyflink/main.py 
/home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
/home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
/home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
/home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
/home/ubuntu/pyflink/UDF1.py {code}
 

to submit my pyflink job.

The error happened on:
{code:java}
st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink select 
{taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
`timestamp`) from mysource").execute().wait()
{code}
My appendix error.txt contains the exceptions. It seems like there is something 
wrong with Apache Beam.

When I use python command to run my job (in standalone mode instead of 
submitting to Flink cluster), it works well.

 

 

 

  was:
I use commd
{code:java}
//代码占位符
./flink run --python /home/ubuntu/pyflink/main.py 
/home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
/home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
/home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
/home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
/home/ubuntu/pyflink/UDF1.py {code}
 

to submit my pyflink job.

The error happened on:
{code:java}
//代码占位符
st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink select 
{taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
`timestamp`) from mysource").execute().wait()
{code}
My appendix error.txt contains the exceptions. It seems like there is something 
wrong with Apache Beam.

When I use python command to run my job (in standalone mode instead of 
submitting to Flink cluster), it works well.

 

 

 


> My Pyflink job could not submit to Flink cluster
> 
>
> Key: FLINK-27830
> URL: https://issues.apache.org/jira/browse/FLINK-27830
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: TongMeng
>Priority: Major
> Attachments: error.txt
>
>
> I use commd
> {code:java}
> ./flink run --python /home/ubuntu/pyflink/main.py 
> /home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
> /home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
> /home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
> /home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
> /home/ubuntu/pyflink/UDF1.py {code}
>  
> to submit my pyflink job.
> The error happened on:
> {code:java}
> st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink 
> select {taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
> terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
> `timestamp`) from mysource").execute().wait()
> {code}
> My appendix error.txt contains the exceptions. It seems like there is 
> something wrong with Apache Beam.
> When I use python command to run my job (in standalone mode instead of 
> submitting to Flink cluster), it works well.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27830) My Pyflink job could not submit to Flink cluster

2022-05-29 Thread TongMeng (Jira)
TongMeng created FLINK-27830:


 Summary: My Pyflink job could not submit to Flink cluster
 Key: FLINK-27830
 URL: https://issues.apache.org/jira/browse/FLINK-27830
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0
Reporter: TongMeng
 Attachments: error.txt

I use commd
{code:java}
//代码占位符
./flink run --python /home/ubuntu/pyflink/main.py 
/home/ubuntu/pyflink/xmltest.xml /home/ubuntu/pyflink/task.xml --pyFliles 
/home/ubuntu/pyflink/logtest.py /home/ubuntu/pyflink/KafkaSource.py 
/home/ubuntu/pyflink/KafkaSink.py /home/ubuntu/pyflink/pyFlinkState.py 
/home/ubuntu/pyflink/projectInit.py /home/ubuntu/pyflink/taskInit.py 
/home/ubuntu/pyflink/UDF1.py {code}
 

to submit my pyflink job.

The error happened on:
{code:java}
//代码占位符
st_env.create_statement_set().add_insert_sql(f"insert into algorithmsink select 
{taskInfo}(line, calculationStatusMap, gathertime, storetime, rawData, 
terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, 
`timestamp`) from mysource").execute().wait()
{code}
My appendix error.txt contains the exceptions. It seems like there is something 
wrong with Apache Beam.

When I use python command to run my job (in standalone mode instead of 
submitting to Flink cluster), it works well.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26179) Support periodic savepointing in the operator

2022-05-29 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-26179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543656#comment-17543656
 ] 

Márton Balassi commented on FLINK-26179:


Agreed, based on recent discussion with multiple users this comes up too 
frequently to ignore. The point that made it obvious for me is that I user 
rightfully expects that they can keep the last submitted custom resource in git 
and if we ask them to cron a savepointTriggerNonce update regularly that will 
result in regular unwanted commits. We should just add a config for this 
instead.

Gyula's solution to not add a new timer callback to the system, but only check 
once per reconcile loop is the adequate compromise for this (the tradeoff is 
that the reconciliation interval is effectively the lower bound of the 
savepoint trigger interval). Please proceed.

> Support periodic savepointing in the operator
> -
>
> Key: FLINK-26179
> URL: https://issues.apache.org/jira/browse/FLINK-26179
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> Automatic triggering of savepoints is a commonly requested feature.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27009) Support SQL job submission in flink kubernetes opeartor

2022-05-29 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-27009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543655#comment-17543655
 ] 

Márton Balassi commented on FLINK-27009:


[~bgeng777] cool, I assigned the ticket to you then. Looking forward to the 
FLIP. :)

> Support SQL job submission in flink kubernetes opeartor
> ---
>
> Key: FLINK-27009
> URL: https://issues.apache.org/jira/browse/FLINK-27009
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>
> Currently, the flink kubernetes opeartor is for jar job using application or 
> session cluster. For SQL job, there is no out of box solution in the 
> operator.  
> One simple and short-term solution is to wrap the SQL script into a jar job 
> using table API with limitation.
> The long-term solution may work with 
> [FLINK-26541|https://issues.apache.org/jira/browse/FLINK-26541] to achieve 
> the full support.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27009) Support SQL job submission in flink kubernetes opeartor

2022-05-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-27009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-27009:
--

Assignee: Biao Geng

> Support SQL job submission in flink kubernetes opeartor
> ---
>
> Key: FLINK-27009
> URL: https://issues.apache.org/jira/browse/FLINK-27009
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>
> Currently, the flink kubernetes opeartor is for jar job using application or 
> session cluster. For SQL job, there is no out of box solution in the 
> operator.  
> One simple and short-term solution is to wrap the SQL script into a jar job 
> using table API with limitation.
> The long-term solution may work with 
> [FLINK-26541|https://issues.apache.org/jira/browse/FLINK-26541] to achieve 
> the full support.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #83: [FLINK-27170] Add Transformer and Estimator for OnlineLogisticRegression

2022-05-29 Thread GitBox


weibozhao commented on code in PR #83:
URL: https://github.com/apache/flink-ml/pull/83#discussion_r884283585


##
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/OnlineLogisticRegressionParams.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.ml.common.param.HasBatchStrategy;
+import org.apache.flink.ml.common.param.HasElasticNet;
+import org.apache.flink.ml.common.param.HasGlobalBatchSize;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasReg;
+import org.apache.flink.ml.common.param.HasWeightCol;
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link OnlineLogisticRegression}.
+ *
+ * @param  The class type of this instance.
+ */
+public interface OnlineLogisticRegressionParams
+extends HasLabelCol,
+HasWeightCol,
+HasBatchStrategy,
+HasGlobalBatchSize,
+HasReg,
+HasElasticNet,
+OnlineLogisticRegressionModelParams {
+
+Param ALPHA =
+new DoubleParam("alpha", "The parameter alpha of ftrl.", 0.1, 
ParamValidators.gt(0.0));
+
+default Double getAlpha() {
+return get(ALPHA);
+}
+
+default T setAlpha(Double value) {
+return set(ALPHA, value);
+}
+
+Param BETA =
+new DoubleParam("alpha", "The parameter beta of ftrl.", 0.1, 
ParamValidators.gt(0.0));

Review Comment:
   I have add ftrl doc in the doc of Online logistic regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] itaykat commented on pull request #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types

2022-05-29 Thread GitBox


itaykat commented on PR #7598:
URL: https://github.com/apache/flink/pull/7598#issuecomment-1140457071

   Any updates regarding this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27652) CompactManager.Rewriter cannot handle different partition keys invoked compaction

2022-05-29 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-27652:
--
Description: 
h3. Issue Description

When enabling {{commit.force-compact}} for the partitioned managed table, there 
had a chance that the successive synchronized
writes got failure.  {{rewrite}} method messing up with the wrong data file 
with the {{partition}} and {{{}bucket{}}}.
{code:java}
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: java.io.FileNotFoundException: File 
file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0
 does not exist or the user running Flink ('jane.cjm') has insufficient 
permissions to access it. at 
org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
{code}
However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to 
partition Autumn. It seems like the rewriter found the wrong partition/bucket 
with the wrong file.
h3. How to Reproduce
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.store.connector;

import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

/** A reproducible case. */
public class ForceCompactionITCase extends FileStoreTableITCase {

@Override
protected List ddl() {
return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS T1 ("
+ "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)");
}

@Test
public void test() throws ExecutionException, InterruptedException {
bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')"
+ ",(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
+ "(7, 'Summer', 'Summertime Sadness')")
.await();
bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
Christmas')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
Coming')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
'Refrain')").await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
Sugar'), "
+ "(4, 'Spring', 'Spring Water')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer 
Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer 
Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
}
}

{code}

  was:
h3. Issue Description
When enabling {{com

[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543637#comment-17543637
 ] 

Galen Warren commented on FLINK-27813:
--

Cool! Glad it was something simple.

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr Kazimirov
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat 
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  Time interval unit label 'm' does not match any of the recognized units: 
> DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (

[jira] [Closed] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr closed FLINK-27813.
-
Resolution: Won't Fix

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat 
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  Time interval unit label 'm' does not match any of the recognized units: 
> DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | 
> minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS

[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543624#comment-17543624
 ] 

Oleksandr commented on FLINK-27813:
---

[~galenwarren]  I found the issue in the configuration in module.yaml

we used such a config in 3.1.0:
{code:java}
kind: io.statefun.endpoints.v2/http
spec:
  functions: ua.test.execution/*
  urlPathTemplate: http://test-{{ $.Values.global.platformEnvironment 
}}.svc.cluster.local:8080/v1/functions
  transport:
type: io.statefun.transports.v1/async
timeouts:
  call: 6m{code}
now in 3.2.0 -  *6m* should be *6min*

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
>  
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.j

[jira] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


[ https://issues.apache.org/jira/browse/FLINK-27813 ]


Oleksandr deleted comment on FLINK-27813:
---

was (Author: JIRAUSER290114):
We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 6
execution.checkpointing.timeout: 120
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoint: https://minio.test
s3.path.style.access: true
s3.access-key: ***
s3.secret-key: ***
state.backend.incremental: true
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 4g
}{code}
 

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> or

[jira] [Comment Edited] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543616#comment-17543616
 ] 

Oleksandr edited comment on FLINK-27813 at 5/29/22 11:13 AM:
-

We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 6
execution.checkpointing.timeout: 120
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoint: https://minio.test
s3.path.style.access: true
s3.access-key: ***
s3.secret-key: ***
state.backend.incremental: true
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 4g
}{code}
 


was (Author: JIRAUSER290114):
[~galenwarren] 

We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 6
execution.checkpointing.timeout: 120
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoi

[jira] [Comment Edited] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543616#comment-17543616
 ] 

Oleksandr edited comment on FLINK-27813 at 5/29/22 11:06 AM:
-

[~galenwarren] 

We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 6
execution.checkpointing.timeout: 120
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoint: https://minio.test
s3.path.style.access: true
s3.access-key: ***
s3.secret-key: ***
state.backend.incremental: true
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 4g
}{code}
 


was (Author: JIRAUSER290114):
We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 6
execution.checkpointing.timeout: 120
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoi

[jira] [Comment Edited] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543616#comment-17543616
 ] 

Oleksandr edited comment on FLINK-27813 at 5/29/22 11:06 AM:
-

We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} 

{code}
 

flink-conf.yaml

 
{code:java}
{
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 2
execution.checkpointing.interval: 1min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 6
execution.checkpointing.timeout: 120
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
state.backend: rocksdb
state.backend.async: true
state.checkpoints.num-retained: 3
state.savepoints.dir: s3://savepoints-dev/savepoints
state.checkpoints.dir: s3://checkpoints-dev/checkpoints
s3.endpoint: https://minio.test
s3.path.style.access: true
s3.access-key: ***
s3.secret-key: ***
state.backend.incremental: true
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 4g
}{code}
 


was (Author: JIRAUSER290114):
We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} {code}

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentMan

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #244: [FLINK-27520] Use admission-controller-framework in Webhook

2022-05-29 Thread GitBox


gyfora commented on code in PR #244:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/244#discussion_r884251881


##
flink-kubernetes-webhook/pom.xml:
##
@@ -36,6 +36,19 @@ under the License.
 org.apache.flink
 flink-kubernetes-operator
 ${project.version}
+provided
+
+
+
+io.javaoperatorsdk
+operator-framework-framework-core
+${operator.sdk.admission-controller.version}
+
+
+*

Review Comment:
   I think the slightly cleaner approach here would be to remove the exclude 
all and explicitly include artifacts in the shade plugin:
   
   ```
   



io.javaoperatorsdk:operator-framework-framework-core



   
   
org.apache.flink.kubernetes.operator.admission.FlinkOperatorWebhook
   
   
   
   ``` 
   
   This will be more flexible in the future when we need to add 1-2 extra 
packages here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3

2022-05-29 Thread GitBox


gyfora commented on PR #239:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/239#issuecomment-1140423713

   +1 for waiting 1-2 days for the JOSDK fixes before merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27813) java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0

2022-05-29 Thread Oleksandr (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543616#comment-17543616
 ] 

Oleksandr commented on FLINK-27813:
---

We have job manager running that is configured via module.yaml(egress and 
ingress for kafka topics).
Our stateful functions are on a separate remote service built on top of Spring 
boot with controller:
{code:java}
@RequestMapping("/v1/functions")
@RequiredArgsConstructor
public class FunctionRouteController {

private final RequestReplyHandler handler;

@PostMapping(consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public CompletableFuture onRequest(@RequestBody byte[] request) {
return handler
.handle(Slices.wrap(request))
.thenApply(Slice::toByteArray);
}
} {code}
where actually used 
'org.apache.flink.statefun.sdk.java.handlerConcurrentRequestReplyHandler'  and 
we just define beans:
{code:java}
@Bean
public StatefulFunctions statefulFunctions(StatefulFunctions 
autoRegisteredStatefulFunctions) {
// obtain a request-reply handler based on the spec above
StatefulFunctions statefulFunctions = new StatefulFunctions()
.withStatefulFunction(fallbackFn())
.withStatefulFunction(transactionFn());

autoRegisteredStatefulFunctions.functionSpecs().values().forEach(statefulFunctions::withStatefulFunction);
return statefulFunctions;
}

@Bean
public RequestReplyHandler requestReplyHandler(StatefulFunctions 
statefulFunctions) {
return statefulFunctions.requestReplyHandler();
} {code}
 

statefulFunctions.requestReplyHandler() is :
{code:java}
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(this.specs);
} {code}

> java.lang.IllegalStateException: after migration from statefun-3.1.1 to 3.2.0
> -
>
> Key: FLINK-27813
> URL: https://issues.apache.org/jira/browse/FLINK-27813
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: statefun-3.2.0
>Reporter: Oleksandr
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> Issue was met after migration from 
> flink-statefun:3.1.1-java11
> to
> flink-statefun:3.2.0-java11
>  
> {code:java}
> ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
> ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
> ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
> (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Unable to parse Netty 
> transport spec.\n\tat 
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
>  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
>  
> org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
>  
> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
>  
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
>  
> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
>  
> org.apache.flink.st

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #247: [FLINK-27668] Document dynamic operator configuration

2022-05-29 Thread GitBox


gyfora commented on code in PR #247:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/247#discussion_r884250202


##
docs/content/docs/operations/configuration.md:
##
@@ -50,11 +50,50 @@ defaultConfiguration:
 
 To learn more about metrics and logging configuration please refer to the 
dedicated [docs page]({{< ref "docs/operations/metrics-logging" >}}).
 
+## Dynamic Operator Configuration
+
+The Kubernetes operator supports dynamic config changes through the operator 
ConfigMaps. Dynamic operator configuration can be enabled by setting 
`kubernetes.operator.dynamic.config.enabled`  to true. Time interval for 
checking dynamic config changes is specified by 
`kubernetes.operator.dynamic.config.check.interval` of which default value is 5 
minutes. 
+
+Verify whether dynamic operator configuration updates is enabled via the 
`deploy/flink-kubernetes-operator` log has:
+
+```
+2022-05-28 13:08:29,222 o.a.f.k.o.c.FlinkConfigManager [INFO ] Enabled dynamic 
config updates, checking config changes every PT5M
+```
+
+For example, the default configuration of `flink-conf.yaml` in the 
`defaultConfiguration` section of the Helm `values.yaml` file is as follows:
+
+```
+defaultConfiguration:
+  create: true
+  append: false
+  flink-conf.yaml: |+
+# Flink Config Overrides
+kubernetes.operator.reconciler.reschedule.interval: 60 s
+```
+
+When the interval for the controller to reschedule the reconcile process need 
to change with 30 seconds, `kubernetes.operator.reconciler.reschedule.interval` 
in the `defaultConfiguration` section can directly change to 30s:
+
+```
+defaultConfiguration:
+  create: true
+  append: false
+  flink-conf.yaml: |+
+# Flink Config Overrides
+kubernetes.operator.reconciler.reschedule.interval: 30 s
+```

Review Comment:
   I think this is not a good example because if you try to update the 
configmap through the Helm chart (using helm-upgrade) nothing will happen 
because helm only checks if the configmap is present. It will not update it.
   
   We should simply show an example of editing the configmap directly. 



##
docs/content/docs/operations/configuration.md:
##
@@ -50,11 +50,50 @@ defaultConfiguration:
 
 To learn more about metrics and logging configuration please refer to the 
dedicated [docs page]({{< ref "docs/operations/metrics-logging" >}}).
 
+## Dynamic Operator Configuration
+
+The Kubernetes operator supports dynamic config changes through the operator 
ConfigMaps. Dynamic operator configuration can be enabled by setting 
`kubernetes.operator.dynamic.config.enabled`  to true. Time interval for 
checking dynamic config changes is specified by 
`kubernetes.operator.dynamic.config.check.interval` of which default value is 5 
minutes. 

Review Comment:
   We should say that it is enabled by default, and can be **disabled** with 
the config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

2022-05-29 Thread GitBox


Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884246309


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
 private final Map jobMasterTargetAddresses = new 
HashMap<>();
 private final Map pendingSlotAllocations;
 
+/** Delay of the requirement change check in the slot manager. */
+private final Duration requirementsCheckDelay;

Review Comment:
   do you mean create another ticker for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

2022-05-29 Thread GitBox


KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884242067


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
 private final Map jobMasterTargetAddresses = new 
HashMap<>();
 private final Map pendingSlotAllocations;
 
+/** Delay of the requirement change check in the slot manager. */
+private final Duration requirementsCheckDelay;

Review Comment:
   I tend to move this to a preceding issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ChengkaiYang2022 commented on pull request #19681: [FLINK-27544][docs]Example code in 'Structure of Table API and SQL Programs' is out of date and cannot run.

2022-05-29 Thread GitBox


ChengkaiYang2022 commented on PR #19681:
URL: https://github.com/apache/flink/pull/19681#issuecomment-1140390388

   Hi,Could you @MartijnVisser  help to check it ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org