CDC – Parte 05

Nesta etapa da série será possível entender o funcionamento das TVFs (Table Valued Function) denominadas [cdc].[fn_cdc_get_all_changes_…][cdc].[fn_cdc_get_net_changes_…] e também da IF (Inline Table-Valued Function) denominada [sys].[fn_cdc_map_time_to_lsn] usada para transformar o conteúdo de data em LSN (Log Serial Number). Essas funções são usadas para manipulação dos dados contidos nas tabelas de CDC.

Um outro ponto importante com relação ao entendimento dessas funções é que elas são usadas internamente pelo Integration Services dentro do componente de origem de dados do CDC. Ou seja, seus conceitos são um ponto muito importante mais tarde, quando estivermos gerando as cargas de ETL.


Como foi explanado na Parte 3 do conjunto de posts, o CDC cria dentro das tabelas de histórico algumas colunas onde, através delas, é possível realizar o controle de versão dos registros com base nos LSNs do Log de Transação.

Trabalhar com base no log é complexo, dessa forma, é possível que realizemos a conversão de LSN em data através da IF [sys].[fn_cdc_map_time_to_lsn]. Essa função captura a data desejada com base em algums parâmetros e transforma internamento esse range de período no LSN correto a ser interpretado pela função [cdc].[fn_cdc_get_all_changes_…] ou [cdc].[fn_cdc_get_net_changes_…].

Para trabalhar com a função [sys].[fn_cdc_map_time_to_lsn] é necessário entender acerca de sues parâmetros que são:

@relacional_operator – É usado para identificar um valor de LSN distinto dentro da tabela cdc.lsn_time_mapping com um tran_end_time associado que satisfaça a relação quando comparado ao valor @tracking_time (BOL). Dentro desse parâmetro é possível usar os seguintes valores:

‘smallest greater than’ ou ‘smallest greater than or equal’, onde:

smallest

‘largest less than’ ou ‘largest less than or equal’, onde:

largest

@tracking_time – É o valor de data e hora ao qual corresponder. tracking_time é datetime (BOL).


Agora que compreendida a Função [sys].[fn_cdc_map_time_to_lsn] você irá iniciar o entendimento dos parâmetros das funções [cdc].[fn_cdc_get_all_changes_…][cdc].[fn_cdc_get_net_changes_…], onde cada uma delas possui os seguintes parâmetros:

@from_lsn – LSN de início da transação;

@to_lsn – LSN de fim da transação;

@row_filter_option – Rege o conteúdo das colunas de metadados, bem como as linhas retornadas no conjunto de resultados.

Para trabalhar com ambas as functions para manipulação de registros é necessário usar o código abaixo. Lembrando que, conforme foi dito na Parte 02 do conjunto de posts,  o suporte a Net Changes somente é habilitado caso o parâmetro @supports_net_changes seja preenchido como um na hora de habilitar o CDC para tabela. Caso não, a função [cdc].[fn_cdc_get_net_changes_…] não será criada.


-- ===========================
-- checking get_all funtion --
-- ===========================

USE AdventureWorks2014
GO

-- function all change - Used for capture all changes in tables with cdc excluding the last value in update modification
DECLARE @begin_time datetime,
@end_time datetime,
@from_lsn binary(10),
@to_lsn binary(10);

SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'smallest greater than', @begin_time);
-- the parameter <relational_operator> may have two values: SMALLEST GREATER THAN OR EQUAL or SMALLEST GREATER THAN

SET @to_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'largest less than or equal', @end_time);
-- the parameter <relational_operator> may have two values: LARGEST LESS THAN OR EQUAL or LARGEST LESS THAN

SELECT * 
FROM cdc.fn_cdc_get_all_changes_Person_Person(@from_lsn, @to_lsn, /*<row_filter_options>*/'all update old');
-- the parameter <row_filter_options> may have three values: ALL and ALL UPDATE OLD

-- ===========================
-- checking get_net funtion --
-- ===========================

USE AdventureWorks2014
GO
-- function net change - Used for capture the last value of the data, dispensing information of the update and delete statements
DECLARE @begin_time datetime,
@end_time datetime,
@from_lsn binary(10),
@to_lsn binary(10);

SET @begin_time = GETDATE() -1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'smallest greater than', @begin_time);
-- the parameter <relational_operator> may have two values: SMALLEST GREATER THAN OR EQUAL or SMALLEST GREATER THAN

SET @to_lsn = sys.fn_cdc_map_time_to_lsn(/*<relational_operator>*/'largest less than or equal', @end_time);
-- the parameter <relational_operator> may have two values: LARGEST LESS THAN OR EQUAL or LARGEST LESS THAN

SELECT * 
FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, /*<row_filter_options>*/'all with mask');
-- the parameter <row_filter_options> may have three values: ALL, ALL WITH MASK (return the final LSN of the row) and ALL WITH MERGE

Primeiramente execute os comandos abaixo que irão Inserir e Deletar alguns registros na tabela Person.Person e, também alguns Updates nos seus dados:

USE AdventureWorks2014
GO

-- ====================================
-- ----- insert operation test --------
-- ====================================

INSERT INTO [Person].[BusinessEntity]
 ([rowguid]
 ,[ModifiedDate])
VALUES ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50011',GETDATE()), 
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50081',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50082',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50083',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50084',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50085',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50086',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50090',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50088',GETDATE()),
 ('0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50089',GETDATE())
GO

INSERT INTO [Person].[Person]
 ([BusinessEntityID]
 ,[PersonType]
 ,[NameStyle]
 ,[Title]
 ,[FirstName]
 ,[MiddleName]
 ,[LastName]
 ,[Suffix]
 ,[EmailPromotion]
 ,[AdditionalContactInfo]
 ,[Demographics]
 ,[rowguid]
 ,[ModifiedDate])
VALUES (20778,'IN',0,NULL,'Arthur','J','Luz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50011',GETDATE()),
 (20779,'IN',0,NULL,'Heitor','D','Luz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50081',GETDATE()),
 (20780,'IN',0,NULL,'Ulisses','L','Silva Neto',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50082',GETDATE()),
 (20781,'IN',0,NULL,'Raquel','F','Nascimento',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50083',GETDATE()),
 (20782,'IN',0,NULL,'Miguel','C','Luz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50084',GETDATE()),
 (20783,'IN',0,NULL,'Rafael','M','Guedes',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50085',GETDATE()),
 (20784,'IN',0,NULL,'Marcelo','F','Moreira',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50086',GETDATE()),
 (20785,'IN',0,NULL,'Ana','L','Dos Santos',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50090',GETDATE()),
 (20786,'IN',0,NULL,'Leticia','B','Mendes',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50088',GETDATE()),
 (20787,'IN',0,NULL,'Lucas','A','Cruz',NULL,1,NULL,NULL,'0C7D8F81-D7B1-4CF0-9C0A-4CD8B6B50089',GETDATE())
GO

-- ====================================
-- ----- delete operation test --------
-- ====================================

DELETE FROM [Person].[Person]
WHERE [BusinessEntityID] IN (
 20785,
 20786,
 20787
)
GO

-- ====================================
-- ----- update operation test --------
-- ====================================

UPDATE [Person].[Person] 
SET [FirstName] = 'William',
 [LastName] = 'Golçalves',
 [MiddleName] = 'D'
WHERE [BusinessEntityID] = 20783
GO

UPDATE [Person].[Person] 
SET [Title] = 'Sr.'
WHERE [BusinessEntityID] = 20778
GO

UPDATE [Person].[Person] 
SET [PersonType] = 'SC'
WHERE [BusinessEntityID] = 20782
GO

UPDATE [Person].[Person]
SET [PersonType] = 'SC',
    [Title] = 'Sr.'
WHERE [BusinessEntityID] IN (1,2,3,4)
GO

Para facilitar o entendimento, para cada uma das opções de @row_filter_option de cada uma das TVFs, você usará um dos códigos abaixo (partindo do pressuposto de que o ambiente já foi criado seguindo os passos anteriores do conjunto de posts):

[fn_cdc_get_all_changes_…] – Value ALL

Usado para retornar todos registros da tabela de histórico do CDC eliminando os registros que possuem o valor 3 no campo [__$operation] pois estes registros são os valores anteriores ao valor atual do update (vide Parte 3 do conjunto de posts).

USE AdventureWorks2014
GO

-- get_all_changes with row filter options equal all
DECLARE @begin_time datetime, 
 @end_time datetime, 
 @from_lsn binary(10), 
 @to_lsn binary(10);

SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); 
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time);

SELECT * 
FROM cdc.fn_cdc_get_all_changes_Person_Person(@from_lsn, @to_lsn, 'all'); 
GO

01

[fn_cdc_get_all_changes_…] – Value ALL UPDATE OLD

Usado para retornar todos registros da tabela de histórico do CDC incluindo os registros que possuem o valor 3 no campo [__$operation].

USE AdventureWorks2014
GO

-- get_all_changes with row filter options equal all update old
DECLARE @begin_time datetime, 
 @end_time datetime, 
 @from_lsn binary(10), 
 @to_lsn binary(10);

SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); 
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time);

SELECT * 
FROM cdc.fn_cdc_get_all_changes_Person_Person(@from_lsn, @to_lsn, 'all update old'); 
GO

02


[fn_cdc_get_net_changes_…] – Value ALL

Retorna somente a ultima atualização para cada um dos registros da tabela de histórico. Por exemplo, caso um registro tenha sido inserido, sofrido um update e depois deletado, a query abaixo irá mostrar apenas o registro deletado.

A coluna [__$update_mask] aparece com o valor NULL.

USE AdventureWorks2014
GO

-- get_all_changes with row filter options equal all
DECLARE @begin_time datetime, 
 @end_time datetime, 
 @from_lsn binary(10), 
 @to_lsn binary(10);

SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); 
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time);

SELECT * 
FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, 'all'); 
GO

03

[fn_cdc_get_net_changes_…] – Value ALL WITH MASK

Retorna somente a ultima atualização para cada um dos registros da tabela de histórico assim como na situação anterior, porém, nesse caso, quando a coluna [__$operation] retornar o valor 4 (novo valor de um update), temos a coluna [__$update_mask] preenchida com a máscara da ultima atualização.

Você entenderá melhor sobre a utilização dessa coluna mais abaixo neste post.

USE AdventureWorks2014
GO

-- get_all_changes with row filter options equal all with mask
DECLARE @begin_time datetime, 
 @end_time datetime, 
 @from_lsn binary(10), 
 @to_lsn binary(10);

SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); 
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time);

SELECT * 
FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, 'all with mask'); 
GO

04

[fn_cdc_get_net_changes_…] – Value ALL WITH MERGE

Retorna somente a ultima atualização para cada um dos registros da tabela de histórico, porém realiza um merge entre os registros Inseridos e Atualizados mostrando, neste caso, para ambas as situações, a coluna [__$operation] com valor igual a 5. O valor igual a 1 é usado pra deleção.

USE AdventureWorks2014
GO

--delete registry existing before cdc initialization
ALTER TABLE [Person].[EmailAddress]
DROP CONSTRAINT [FK_EmailAddress_Person_BusinessEntityID]
GO

ALTER TABLE [HumanResources].[Employee]
DROP CONSTRAINT [FK_Employee_Person_BusinessEntityID]
GO

ALTER TABLE [Person].[Password]
DROP CONSTRAINT [FK_Password_Person_BusinessEntityID]
GO

ALTER TABLE [Person].[PersonPhone]
DROP CONSTRAINT [FK_PersonPhone_Person_BusinessEntityID]
GO

DELETE FROM [Person].[Person]
WHERE [BusinessEntityID] IN (10,11,12)
GO
-- get_all_changes with row filter options equal all with merge
DECLARE @begin_time datetime, 
 @end_time datetime, 
 @from_lsn binary(10), 
 @to_lsn binary(10);

SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); 
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @end_time);

SELECT * 
FROM cdc.fn_cdc_get_net_changes_Person_Person(@from_lsn, @to_lsn, 'all with merge'); 
GO

Capture

Observer que para os registros de [BusinessEntityID] igual a 20785, 20786 e 20787 da tabela [Person].[Person] que foram inseridos hoje e também deletados hoje, ao realizar a operação de merge de acordo com o Range de data que foi passado (de ontem para hoje), o CDC desconsidera a operação de INSERT e nem o apresenta no retorno do SELECT. É como se esse registro não houvesse sido inserido.

Porém, para os registros 10, 11 e 12 que já faziam parte da tabela, ele mostra no select para a coluna [__$operation] o valor 1.


A coluna [__$update_mask] é responsável por “armazena uma máscara de bits com base nos ordinais de coluna da tabela de alteração que identificam as colunas que foram alteradas” (BOL).

Em outras palavras, através dela é possível descobrir quais colunas foram alteradas dentro de uma instrução de UPDATE. Caso a instrução seja de DELETE ou INSERT todas as colunas serão apresentadas.

No código será possível verificar quais registros, de acordo com o range de data fornecido, sofreram alterações de UPDATE ([__$operation] = 4) na coluna [Title] da tabela [Person].[Person]:

-- =======================================
-- understand the column __$update_mask --
-- =======================================

DECLARE @StartDate datetime = GETDATE()-1;
DECLARE @EndDate datetime = GETDATE();
DECLARE @begin_lsn BINARY(10); 
DECLARE @end_lsn BINARY(10);

-- Set the LSN values
SELECT @begin_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @StartDate);
SELECT @end_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @EndDate);

-- get all updated rows where Title was updated
SELECT *
FROM cdc.fn_cdc_get_all_changes_Person_Person(@begin_lsn, @end_lsn, 'all')
WHERE __$operation = 4 
 AND sys.fn_cdc_is_bit_set (
 sys.fn_cdc_get_column_ordinal ('Person_Person','Title'),__$update_mask
 ) = 1
GO

Capture2

Já usando o código abaixo será possivel identificar quais colunas foram alteradas para cada um dos registros da tabela de histórico do CDC capturando somente aqueles que possuem valor de [__$operation] = 4 (todos os valores de UPDATE).

-- =======================================
-- understand the column __$update_mask --
-- =======================================

-- capture all columns changed in each transaction
SELECT ( SELECT CC.column_name + ','
 FROM cdc.captured_columns CC
 INNER JOIN cdc.change_tables CT ON CC.[object_id] = CT.[object_id]
 WHERE capture_instance = 'Person_Person'
 AND sys.fn_cdc_is_bit_set(CC.column_ordinal,
 PD.__$update_mask) = 1
 FOR
 XML PATH('')
 ) AS ChangedColumns,
 *
FROM cdc.Person_Person_CT PD
WHERE PD.__$operation = 4
GO

Capture3


No proximo post da série será possível aprender sobre as melhores práticas e observações para a utilização do Change Data Capture. Após isso, iniciaremos a etapa de construção de ETLs para consumo da feature em cargas de dados de Data Warehouse (DWs) e/ou Operational Data Stores (ODSs).

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google

Você está comentando utilizando sua conta Google. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

Conectando a %s