[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user asfgit closed the pull request at: https://github.com/apache/carbondata/pull/1638 ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156832490 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.DataCommand + +import org.apache.carbondata.streaming.segment.StreamSegment + +/** + * This command will try to change the status of the segment from "streaming" to "streaming finish" + */ +case class CarbonAlterTableFinishStreaming( +dbName: Option[String], +tableName: String) + extends DataCommand { --- End diff -- fixed ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156834294 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java --- @@ -180,6 +182,70 @@ public static String close(CarbonTable table, String segmentId) } } + /** + * change the status of the segment from "streaming" to "streaming finish" + */ + public static void finishStreaming(CarbonTable carbonTable) throws Exception { +ICarbonLock lock = CarbonLockFactory.getCarbonLockObj( +carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.TABLE_STATUS_LOCK); +try { + if (lock.lockWithRetries()) { +ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.STREAMING_LOCK); +try { + if (streamingLock.lockWithRetries()) { +LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); +boolean updated = false; +for (LoadMetadataDetails detail : details) { + if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { +detail.setLoadEndTime(System.currentTimeMillis()); +detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH); +updated = true; + } +} +if (updated) { + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + SegmentStatusManager.writeLoadDetailsIntoFile( + tablePath.getTableStatusFilePath(), details); +} + } else { +String msg = "Failed to finish streaming, because streaming is locked for table " + +carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); +LOGGER.error(msg); +throw new Exception(msg); + } +} finally { + if (streamingLock.unlock()) { +LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable +.getDatabaseName() + "." + carbonTable.getTableName()); + } else { +LOGGER.error("Unable to unlock Table lock for table " + +carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + +" during streaming finished"); + } +} + } else { +String msg = "Failed to acquire table status lock of " + +carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); +LOGGER.error(msg); +throw new Exception(msg); --- End diff -- fixed ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156834271 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java --- @@ -180,6 +182,70 @@ public static String close(CarbonTable table, String segmentId) } } + /** + * change the status of the segment from "streaming" to "streaming finish" + */ + public static void finishStreaming(CarbonTable carbonTable) throws Exception { +ICarbonLock lock = CarbonLockFactory.getCarbonLockObj( +carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.TABLE_STATUS_LOCK); +try { + if (lock.lockWithRetries()) { +ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.STREAMING_LOCK); --- End diff -- fixed ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156832564 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -129,6 +129,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableCompactionCommand(altertablemodel) } + protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] = --- End diff -- fixed ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156599142 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java --- @@ -180,6 +182,70 @@ public static String close(CarbonTable table, String segmentId) } } + /** + * change the status of the segment from "streaming" to "streaming finish" + */ + public static void finishStreaming(CarbonTable carbonTable) throws Exception { +ICarbonLock lock = CarbonLockFactory.getCarbonLockObj( +carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.TABLE_STATUS_LOCK); +try { + if (lock.lockWithRetries()) { +ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.STREAMING_LOCK); --- End diff -- This lock should be acquired first because it is acquired firstly in handoff flow ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156597691 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java --- @@ -180,6 +182,70 @@ public static String close(CarbonTable table, String segmentId) } } + /** + * change the status of the segment from "streaming" to "streaming finish" + */ + public static void finishStreaming(CarbonTable carbonTable) throws Exception { +ICarbonLock lock = CarbonLockFactory.getCarbonLockObj( +carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.TABLE_STATUS_LOCK); +try { + if (lock.lockWithRetries()) { +ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), +LockUsage.STREAMING_LOCK); +try { + if (streamingLock.lockWithRetries()) { +LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); +boolean updated = false; +for (LoadMetadataDetails detail : details) { + if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { +detail.setLoadEndTime(System.currentTimeMillis()); +detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH); +updated = true; + } +} +if (updated) { + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + SegmentStatusManager.writeLoadDetailsIntoFile( + tablePath.getTableStatusFilePath(), details); +} + } else { +String msg = "Failed to finish streaming, because streaming is locked for table " + +carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); +LOGGER.error(msg); +throw new Exception(msg); + } +} finally { + if (streamingLock.unlock()) { +LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable +.getDatabaseName() + "." + carbonTable.getTableName()); + } else { +LOGGER.error("Unable to unlock Table lock for table " + +carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + +" during streaming finished"); + } +} + } else { +String msg = "Failed to acquire table status lock of " + +carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); +LOGGER.error(msg); +throw new Exception(msg); --- End diff -- change to IOException ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156597163 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.DataCommand + +import org.apache.carbondata.streaming.segment.StreamSegment + +/** + * This command will try to change the status of the segment from "streaming" to "streaming finish" + */ +case class CarbonAlterTableFinishStreaming( +dbName: Option[String], +tableName: String) + extends DataCommand { --- End diff -- This should be metadata command ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156596322 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -129,6 +129,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableCompactionCommand(altertablemodel) } + protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] = --- End diff -- add syntax description comment ---
[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156596190 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.DataCommand + +import org.apache.carbondata.streaming.segment.StreamSegment + +/** + * This command will try to change the status of the segment from "streaming" to "streaming finish" + */ +case class CarbonAlterTableFinishStreaming( +dbName: Option[String], +tableName: String) + extends DataCommand { --- End diff -- This is not data command ---