Advanced Real-time Replication and Analytics
One of the key features of the Senzing SDK is the ability to support passing changes to downstream systems for analytics and/or replication. The fundamentals of achieving this are straightforward.
Examples and method names herein are for the Python SDK and will be different for the other SDK languages.
AFFECTED_ENTITIES Response Document
SZ_WITH_INFO Flag
Methods in the SDKs that can modify entity resolution outcomes (such as adding, deleting, processing redo, and reevaluating) have an optional flags
argument. When not used these methods don’t return any information. Specifying the SZ_WITH_INFO
flag on them returns a JSON document outlining if any entities were affected by the operation.
An example of requesting information from add_record()
and the associated with info result:
...
result = sz_engine.add_record(
data_source_code,
record_id,
record_definition,
flags=SzEngineFlags.SZ_WITH_INFO)
...
{
"DATA_SOURCE": "TEST",
"RECORD_ID": "1",
"AFFECTED_ENTITIES": [
{
"ENTITY_ID": 1
},
{
"ENTITY_ID": 35
}
]
}
AFFECTED_ENTITIES
is the list of entity IDs impacted by the method. This allows you to create an eventually consistent view of the entities and relationships that Senzing has built, even in a very asynchronous parallel processing system.
What to do with the affected entities?
There are a few key steps in processing:
-
You can have multiple threads or processes handling the affected entities, but you must ensure you aren’t processing the same entity ID in parallel.
-
Call
get_entity_by_entity_id()
for each entity ID to determine the current state of the entity and its relationships. Often all you need are the data sources and record IDs depending on how you are using the information.- If the entity doesn’t exist, it was moved or deleted from Senzing and you should reflect that in your processing.
- If the entity does exist, determine if any changes need to be addressed in your processing.
Performance
This can be an incredibly fast process if you:
-
Only ask for precisely the information you need from
get_entity_by_entity_id()
, the more information you ask for the more intensive the workload will be. -
If you intend to replicate the data from the entities and records, land or keep the records in your downstream system outside of Senzing in parallel when you load the records into Senzing. For instance, if you have a data warehouse with person records, add an entity ID column and a relationship table so you can efficiently overlay the Senzing entity and relationship maps rather than having to push and transform all the data too.
If you want to further improve efficiency, often there are multiple changes to the same entities in a small period of time. Some customers, especially ones replicating to systems that prefer data in batches (e.g., Elasticsearch), will create an app that takes the affected entities and distinct them over a period of time (5-10 minutes) and then process them directly or pass them to a downstream system.
Examples
These examples are for Senzing v3, Senzing v4 examples are coming soon. In the meantime use these as a guide and reference.
Elasticsearch
This example is succinct and simple, the others hook into a “listener framework” which has additional requirements on your Senzing solution. This simply has classes that transform a get_entity_*()
response into something useful to Elasticsearch, you would adapt this to your use case. Then an example Java function calling the bulk export method of Senzing, performs bulk inserts into Elasticsearch, and finally has some thoughts on using Kibana to search.
https://github.com/Senzing/elasticsearch
Risk Score Calculator
This process performs analytics on entities looking for interesting data quality characteristics, such as entities with more than one tax ID or a tax ID shared by more than one entity. The process manages a table with one row for every entity in Senzing.
-
When an entity is affected and no longer exists, it is removed from the table.
-
When an entity is affected and does exist, it is added/updated in the table.
https://github.com/Senzing/risk-score-calculator
Neo4J Connector
This process assumes the records already exist in Neo4J, the connector is simply making entity nodes with relationships to the records in Neo4J and between the entities.
-
When an entity is affected and no longer exists, it is removed from Neo4J.
-
When an entity is affected and does exist, it is added/updated in the table. Note, that for simplicity it currently completely re-writes the entity rather than identify deltas.
https://github.com/Senzing/connector-neo4j
Why doesn’t SZ_WITH_INFO tell you the changes?
-
The Senzing SDK operates completely in parallel and, based on how multi-processing works, you can’t know that the operation that returns first actually impacted the system first.
For instance, if you called
add_record(record A, flags=SzEngineFlags.SZ_WITH_INFO)
andadd_record(record B, flags=SzEngineFlags.SZ_WITH_INFO)
at the same time, they could both impact the same entities but you could not reasonably derive which order they occurred in. One could be causing the creation of an entity and the other causing a delete. Processing them in the wrong order will cause you to be out of sync with the source. -
Why Senzing doesn’t provide a sequential ID that tells you the order of all the changes? Even if this could be done without dramatically harming the performance of the system, it would put the burden on you to make sure all these changes were performed in order and determine how to deal with missed deltas. This would make it even harder for you to parallelize downstream processing.
If you have any questions, contact Senzing Support. Support is 100% FREE!