Nesta etapa da série será possível entender o funcionamento das TVFs (Table Valued Function) denominadas [cdc].[fn_cdc_get_all_changes_…] e [cdc].[fn_cdc_get_net_changes_…] e também da IF (Inline Table-Valued Function) denominada [sys].[fn_cdc_map_time_to_lsn] usada para transformar o conteúdo de data em LSN (Log Serial Number). Essas funções são usadas para manipulação dos dados contidos nas tabelas de CDC.
Um outro ponto importante com relação ao entendimento dessas funções é que elas são usadas internamente pelo Integration Services dentro do componente de origem de dados do CDC. Ou seja, seus conceitos são um ponto muito importante mais tarde, quando estivermos gerando as cargas de ETL.
Como foi explanado na Parte 3 do conjunto de posts, o CDC cria dentro das tabelas de histórico algumas colunas onde, através delas, é possível realizar o controle de versão dos registros com base nos LSNs do Log de Transação.
Trabalhar com base no log é complexo, dessa forma, é possível que realizemos a conversão de LSN em data através da IF [sys].[fn_cdc_map_time_to_lsn]. Essa função captura a data desejada com base em algums parâmetros e transforma internamento esse range de período no LSN correto a ser interpretado pela função [cdc].[fn_cdc_get_all_changes_…] ou [cdc].[fn_cdc_get_net_changes_…].
Para trabalhar com a função [sys].[fn_cdc_map_time_to_lsn] é necessário entender acerca de sues parâmetros que são:
@relacional_operator – É usado para identificar um valor de LSN distinto dentro da tabela cdc.lsn_time_mapping com um tran_end_time associado que satisfaça a relação quando comparado ao valor @tracking_time (BOL). Dentro desse parâmetro é possível usar os seguintes valores:
‘smallest greater than’ ou ‘smallest greater than or equal’, onde:
‘largest less than’ ou ‘largest less than or equal’, onde:
@tracking_time – É o valor de data e hora ao qual corresponder. tracking_time é datetime (BOL).
Agora que compreendida a Função [sys].[fn_cdc_map_time_to_lsn] você irá iniciar o entendimento dos parâmetros das funções [cdc].[fn_cdc_get_all_changes_…] e [cdc].[fn_cdc_get_net_changes_…], onde cada uma delas possui os seguintes parâmetros:
@from_lsn – LSN de início da transação;
@to_lsn – LSN de fim da transação;
@row_filter_option – Rege o conteúdo das colunas de metadados, bem como as linhas retornadas no conjunto de resultados.
Para trabalhar com ambas as functions para manipulação de registros é necessário usar o código abaixo. Lembrando que, conforme foi dito na Parte 02 do conjunto de posts, o suporte a Net Changes somente é habilitado caso o parâmetro @supports_net_changes seja preenchido como um na hora de habilitar o CDC para tabela. Caso não, a função [cdc].[fn_cdc_get_net_changes_…] não será criada.
-- =========================== -- checking get_all funtion -- -- =========================== USE AdventureWorks2014 GO -- function all change - Used for capture all changes in tables with cdc excluding the last value in update modification DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE()-1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'smallest greater than', @begin_time); -- the parameter <relational_operator> may have two values: SMALLEST GREATER THAN OR EQUAL or SMALLEST GREATER THAN SET @to_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'largest less than or equal', @end_time); -- the parameter <relational_operator> may have two values: LARGEST LESS THAN OR EQUAL or LARGEST LESS THAN SELECT * FROM cdc.fn_cdc_get_all_changes_Person_Person(@from_lsn, @to_lsn, /*<row_filter_options>*/'all update old'); -- the parameter <row_filter_options> may have three values: ALL and ALL UPDATE OLD -- =========================== -- checking get_net funtion -- -- =========================== USE AdventureWorks2014 GO -- function net change - Used for capture the last value of the data, dispensing information of the update and delete statements DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE() -1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'smallest greater than', @begin_time); -- the parameter <relational_operator> may have two values: SMALLEST GREATER THAN OR EQUAL or SMALLEST GREATER THAN SET @to_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'largest less than or equal', @end_time); -- the parameter <relational_operator> may have two values: LARGEST LESS THAN OR EQUAL or LARGEST LESS THAN SELECT * FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, /*<row_filter_options>*/'all with mask'); -- the parameter <row_filter_options> may have three values: ALL, ALL WITH MASK (return the final LSN of the row) and ALL WITH MERGE
Primeiramente execute os comandos abaixo que irão Inserir e Deletar alguns registros na tabela Person.Person e, também alguns Updates nos seus dados:
USE AdventureWorks2014 GO -- ==================================== -- ----- insert operation test -------- -- ==================================== INSERT INTO [Person].[BusinessEntity] ([rowguid] ,[ModifiedDate]) VALUES ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50011',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50081',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50082',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50083',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50084',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50085',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50086',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50090',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50088',GETDATE()), ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50089',GETDATE()) GO INSERT INTO [Person].[Person] ([BusinessEntityID] ,[PersonType] ,[NameStyle] ,[Title] ,[FirstName] ,[MiddleName] ,[LastName] ,[Suffix] ,[EmailPromotion] ,[AdditionalContactInfo] ,[Demographics] ,[rowguid] ,[ModifiedDate]) VALUES (20778,'IN',0,NULL,'Arthur','J','Luz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50011',GETDATE()), (20779,'IN',0,NULL,'Heitor','D','Luz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50081',GETDATE()), (20780,'IN',0,NULL,'Ulisses','L','Silva Neto',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50082',GETDATE()), (20781,'IN',0,NULL,'Raquel','F','Nascimento',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50083',GETDATE()), (20782,'IN',0,NULL,'Miguel','C','Luz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50084',GETDATE()), (20783,'IN',0,NULL,'Rafael','M','Guedes',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50085',GETDATE()), (20784,'IN',0,NULL,'Marcelo','F','Moreira',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50086',GETDATE()), (20785,'IN',0,NULL,'Ana','L','Dos Santos',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50090',GETDATE()), (20786,'IN',0,NULL,'Leticia','B','Mendes',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50088',GETDATE()), (20787,'IN',0,NULL,'Lucas','A','Cruz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50089',GETDATE()) GO -- ==================================== -- ----- delete operation test -------- -- ==================================== DELETE FROM [Person].[Person] WHERE [BusinessEntityID] IN ( 20785, 20786, 20787 ) GO -- ==================================== -- ----- update operation test -------- -- ==================================== UPDATE [Person].[Person] SET [FirstName] = 'William', [LastName] = 'Golçalves', [MiddleName] = 'D' WHERE [BusinessEntityID] = 20783 GO UPDATE [Person].[Person] SET [Title] = 'Sr.' WHERE [BusinessEntityID] = 20778 GO UPDATE [Person].[Person] SET [PersonType] = 'SC' WHERE [BusinessEntityID] = 20782 GO UPDATE [Person].[Person] SET [PersonType] = 'SC', [Title] = 'Sr.' WHERE [BusinessEntityID] IN (1,2,3,4) GO
Para facilitar o entendimento, para cada uma das opções de @row_filter_option de cada uma das TVFs, você usará um dos códigos abaixo (partindo do pressuposto de que o ambiente já foi criado seguindo os passos anteriores do conjunto de posts):
[fn_cdc_get_all_changes_…] – Value ALL
Usado para retornar todos registros da tabela de histórico do CDC eliminando os registros que possuem o valor 3 no campo [__$operation] pois estes registros são os valores anteriores ao valor atual do update (vide Parte 3 do conjunto de posts).
USE AdventureWorks2014 GO -- get_all_changes with row filter options equal all DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE()-1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time); SELECT * FROM cdc.fn_cdc_get_all_changes_Person_Person(@from_lsn, @to_lsn, 'all'); GO
[fn_cdc_get_all_changes_…] – Value ALL UPDATE OLD
Usado para retornar todos registros da tabela de histórico do CDC incluindo os registros que possuem o valor 3 no campo [__$operation].
USE AdventureWorks2014 GO -- get_all_changes with row filter options equal all update old DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE()-1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time); SELECT * FROM cdc.fn_cdc_get_all_changes_Person_Person(@from_lsn, @to_lsn, 'all update old'); GO
[fn_cdc_get_net_changes_…] – Value ALL
Retorna somente a ultima atualização para cada um dos registros da tabela de histórico. Por exemplo, caso um registro tenha sido inserido, sofrido um update e depois deletado, a query abaixo irá mostrar apenas o registro deletado.
A coluna [__$update_mask] aparece com o valor NULL.
USE AdventureWorks2014 GO -- get_all_changes with row filter options equal all DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE()-1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, 'all'); GO
[fn_cdc_get_net_changes_…] – Value ALL WITH MASK
Retorna somente a ultima atualização para cada um dos registros da tabela de histórico assim como na situação anterior, porém, nesse caso, quando a coluna [__$operation] retornar o valor 4 (novo valor de um update), temos a coluna [__$update_mask] preenchida com a máscara da ultima atualização.
Você entenderá melhor sobre a utilização dessa coluna mais abaixo neste post.
USE AdventureWorks2014 GO -- get_all_changes with row filter options equal all with mask DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE()-1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, 'all with mask'); GO
[fn_cdc_get_net_changes_…] – Value ALL WITH MERGE
Retorna somente a ultima atualização para cada um dos registros da tabela de histórico, porém realiza um merge entre os registros Inseridos e Atualizados mostrando, neste caso, para ambas as situações, a coluna [__$operation] com valor igual a 5. O valor igual a 1 é usado pra deleção.
USE AdventureWorks2014 GO --delete registry existing before cdc initialization ALTER TABLE [Person].[EmailAddress] DROP CONSTRAINT [FK_EmailAddress_Person_BusinessEntityID] GO ALTER TABLE [HumanResources].[Employee] DROP CONSTRAINT [FK_Employee_Person_BusinessEntityID] GO ALTER TABLE [Person].[Password] DROP CONSTRAINT [FK_Password_Person_BusinessEntityID] GO ALTER TABLE [Person].[PersonPhone] DROP CONSTRAINT [FK_PersonPhone_Person_BusinessEntityID] GO DELETE FROM [Person].[Person] WHERE [BusinessEntityID] IN (10,11,12) GO -- get_all_changes with row filter options equal all with merge DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = GETDATE()-1; SET @end_time = GETDATE(); SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, 'all with merge'); GO
Observer que para os registros de [BusinessEntityID] igual a 20785, 20786 e 20787 da tabela [Person].[Person] que foram inseridos hoje e também deletados hoje, ao realizar a operação de merge de acordo com o Range de data que foi passado (de ontem para hoje), o CDC desconsidera a operação de INSERT e nem o apresenta no retorno do SELECT. É como se esse registro não houvesse sido inserido.
Porém, para os registros 10, 11 e 12 que já faziam parte da tabela, ele mostra no select para a coluna [__$operation] o valor 1.
A coluna [__$update_mask] é responsável por “armazena uma máscara de bits com base nos ordinais de coluna da tabela de alteração que identificam as colunas que foram alteradas” (BOL).
Em outras palavras, através dela é possível descobrir quais colunas foram alteradas dentro de uma instrução de UPDATE. Caso a instrução seja de DELETE ou INSERT todas as colunas serão apresentadas.
No código será possível verificar quais registros, de acordo com o range de data fornecido, sofreram alterações de UPDATE ([__$operation] = 4) na coluna [Title] da tabela [Person].[Person]:
-- ======================================= -- understand the column __$update_mask -- -- ======================================= DECLARE @StartDate datetime = GETDATE()-1; DECLARE @EndDate datetime = GETDATE(); DECLARE @begin_lsn BINARY(10); DECLARE @end_lsn BINARY(10); -- Set the LSN values SELECT @begin_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @StartDate); SELECT @end_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @EndDate); -- get all updated rows where Title was updated SELECT * FROM cdc.fn_cdc_get_all_changes_Person_Person(@begin_lsn, @end_lsn, 'all') WHERE __$operation = 4 AND sys.fn_cdc_is_bit_set ( sys.fn_cdc_get_column_ordinal ('Person_Person','Title'),__$update_mask ) = 1 GO
Já usando o código abaixo será possivel identificar quais colunas foram alteradas para cada um dos registros da tabela de histórico do CDC capturando somente aqueles que possuem valor de [__$operation] = 4 (todos os valores de UPDATE).
-- ======================================= -- understand the column __$update_mask -- -- ======================================= -- capture all columns changed in each transaction SELECT ( SELECT CC.column_name + ',' FROM cdc.captured_columns CC INNER JOIN cdc.change_tables CT ON CC.[object_id] = CT.[object_id] WHERE capture_instance = 'Person_Person' AND sys.fn_cdc_is_bit_set(CC.column_ordinal, PD.__$update_mask) = 1 FOR XML PATH('') ) AS ChangedColumns, * FROM cdc.Person_Person_CT PD WHERE PD.__$operation = 4 GO
No proximo post da série será possível aprender sobre as melhores práticas e observações para a utilização do Change Data Capture. Após isso, iniciaremos a etapa de construção de ETLs para consumo da feature em cargas de dados de Data Warehouse (DWs) e/ou Operational Data Stores (ODSs).