Skip to main content
Version: 2.14.X

Generating Data

When creating a model, it's always important to test and validate it before deploying it in the real world. If you're lucky, you may already have real data to test with. However, some types of real-world data are hard to come by. In those cases, generating synthetic data is essential for model testing and validation.

This document reviews some basic data generation techniques that Cogility uses, focusing on Python and its related tools.

Installing Tools

This section describes how to install the tools Cogility typically uses for generating test data.

Python

Python is the most popular programming language in data science, and therefore has a rich selection of established data libraries to leverage for data generation purposes.

You can download the latest version of Python from https://www.python.org/downloads/.

Jupyter Notebook

Once you have Python installed in your system, you can start installing libraries. An easy way to do this is through a Python package installer called pip.

In the terminal, run the following command:

pip install jupyterlab

Once Jupyter Lab is installed, run python jupyter lab in the terminal, and a Jupyter editing environment automatically opens in your default browser.

Necessary Libraries

The following are some libraries that will be used throughout the guide.

Faker

Faker is a tool that allows users to generate randomized but realistic fake data for testing or demo purposes. We will use this library to generate our dataset in this guide.

If you're installing libraries using the terminal, run pip install faker.

You can also install Faker directly through a Python kernel in a Jupyter notebook by running the following:

import sys
!{sys.executable} -m pip install faker

Pandas

Pandas is a powerful data analysis and manipulation tool that is essential for any project involving data.

To install through the terminal, run pip install pandas.

To install using a Jupyter notebook, run the following in a Python kernel:

import sys
!{sys.executable} -m pip install pandas

Data Formatting

Cogynt currently only supports processing data in JSON format. Some special attention must be paid to the datetime, geo coordinate, and geo polygon datatypes in order for Cogynt to process them accurately.

Datetime

Dates and times must be in Zulu format: '%Y-%m-%dT%H:%M:%S.%fZ'.

tip

2019-08-14T23:51:34.851483Z

Preprocess all datetime fields to be in Zulu format before using them in Cogynt.

Geo Coordinate

Coordinates should be expressed in the typical GeoJSON format, as follows:

{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [125.6, 10.1]
},
"properties": {
"name": "Dinagat Islands"
}
}

Geo Polygon

Geo polygons should be expressed using GeoJSON format. For example:

{
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-125.068359375,
48.574789910928864
],
[
-117.861328125,
32.47269502206151
],
[
-79.98046875,
25.403584973186703
],
[
-67.236328125,
44.77793589631623
],
[
-125.068359375,
48.574789910928864
]
]
]
}
}

Generating Sample Data

This section provides an exercise in data generation.

Suppose you are building a model for the U.S. Postal Service that monitors change of address applications. It accepts resident profiles and change of address applications as inputs, and outputs an updated resident history when the event pattern finds a match between an existing resident and an application containing their information. To make sure your model works, you would need a set of residents and a set of address change data to test it.

The following sections outline how to create the sample dataset needed for this project.

Generating Resident Profiles

People are one of the most common entities for analysis, especially in a behavior analytic platform like Cogynt. Libraries like Faker make it easy to generate realistic information about fake people.

Let's start by using Python to generate some random profiles for residents with Faker.

Faker Basics

First, import the library:

from faker import Faker

Then, instantiate a Faker object:

fake = Faker()

Next, use the fake object to call Faker's name function to generate a random name:

fake.name()

Running this command multiple times returns a new and different name every time.

Generating Full Profiles

You can also use Faker to create an entire profile for a single person. Create a data structure with first_name, last_name, dob, and phone_number as fields:

resident = {
'first_name': fake.first_name(),
'last_name': fake.last_name(),
'dob': fake.date(),
'phone_number': fake.phone_number()
}

print(f'resident profile sample: {resident}')

As observed in the output, a data structure called resident with the fields first_name, last_name, dob, and phone_number was created. Faker generated random values for each fields.

This data structure (i.e., the curly braces { } after resident, not the one in the print function) is called a dictionary in Python. It is used to store data in key-value pairs. In this case, the key is the field name (e.g. first_name) and the value is what was generated by the built-in Faker functions.

Converting Datetime Information

Cogynt requires all datetime data to be in Zulu format for accurate processing. You may have noticed that the date generated in the dob field is configured as YYYY-MM-DD. Most date information usually comes in a string with a similar format. Many also come with timestamps.

In order to convert these types of datetime strings to Zulu format, build a custom function that converts dates to Zulu format:

zulu_format = '%Y-%m-%dT%H:%M:%S.%fZ'

# A function that converts dates in YYYY-MM-DD format to zulu format
def date_to_zulu(date):
return date + 'T00:00:00.000000Z'

def datetime_to_zulu(datetime):
return datetime.strftime(zulu_format)

print(f'convert date to zulu format: {date_to_zulu(fake.date())}')
print(f'convert date-time to zulu format: {datetime_to_zulu(fake.date_time())}')

Now that you have a way to convert dates to the proper Zulu format, go ahead and generate 10 profiles and store them in a list (as denoted by [ ] in Python):

import uuid

num_profiles = 10

resident_profiles = []
for _ in range(num_profiles):
resident_profiles.append({
'resident_id': str(uuid.uuid4()),
'first_name': fake.first_name(),
'last_name': fake.last_name(),
'dob': date_to_zulu(fake.date()),
'phone_number': fake.phone_number(),
'email': fake.email()
})

print(f'first entry in profiles list: {resident_profiles[0]}')
note

The library uuid is imported and used to generate resident_id. The library is named after UUID (Universally Unique Identifier), a common way for computer systems to generate numbers to identify information.

For all practical purposes, IDs generated using the uuid standard are unique enough such that anyone can create a UUID and use it to identify something with near certainty, knowing that the identifier does not duplicate one that has already been, or will be, created to identify something else. Information labeled with UUIDs by independent parties can therefore be combined later into a single database or transmitted on the same channel, with a negligible chance of duplication.

Generating Address Change Event Data

Now that you have successfully generated some basic resident data, the next step is to create some data that carries change of address events. In computer systems, events always happen with a timestamp. Therefore, all event data generated should have time stamps associated with them.

Some relevant information for a change of address event includes old_address, new_address, and maybe even the new_coordinates of the new address.

Create a sample change of address event as follows:

address_change = {
'old_address': fake.address(),
'new_address': fake.address(),
'new_coordinates': fake.latlng()
}

print(f'address change sample record: {address_change}')

Converting Geo Coordinates

Faker provides coordinate information in a tuple, as denoted by ( ) in Python. Cogynt requires geo coordinates to be presented in a specific GeoJSON format. A custom function must be written to convert this latitude-longitude tuple into a GeoJSON. For example:

def latlng_to_geojson(latlng):
# split tuple values into lat and lng
lat, lng = latlng
return {
'geometry': {
'coordinates': [float(lng), float(lat)], 'type': 'Point'},
'type': 'Feature',
'properties': {'name': 'Point'}
}

print(f'convert latlng to geo-JSON: {latlng_to_geojson(fake.latlng())}')

Relating Residents to Events Using resident_id

Another important thing to include in the address change record is its owner (i.e., the resident who submitted the change of address form). Update the data structure by adding resident_id as a field:

sample_resident_id = resident_profiles[0]['resident_id']

address_change = {
'resident_id': sample_resident_id,
'old_address': fake.address(),
'new_address': fake.address(),
'new_coordinates': latlng_to_geojson(fake.latlng())
}

print(f'address change sample record: {address_change}')

Next, generate five address change event records for each resident. Give each record a unique ID and a time stamp, as well:

num_records_per_resident = 5
address_change_records = []

for resident in resident_profiles:
for _ in range(num_records_per_resident):
address_change_records.append({
'record_id': str(uuid.uuid4()),
'resident_id': resident['resident_id'],
'timestamp': datetime_to_zulu(fake.date_time()),
'old_address': fake.address(),
'new_address': fake.address(),
'new_coordinates': latlng_to_geojson(fake.latlng())
})

print(f'total number of address change records: {len(address_change_records)}')

Using Pandas DataFrames

It's worth checking that the event records were generated correctly, but 50 records is quite a lot of information to print out and read in plain text. The solution is to use the pandas library.

Pandas is a powerful open-source data analysis and manipulation library that is also well integrated with Jupyter notebook. Pandas stores data in a special data structure called a "DataFrame," which makes the data easier to manipulate and review.

Create a DataFrame as follows:

import pandas as pd

address_change_df = pd.DataFrame(address_change_records)

address_change_df

The pandas DataFrame takes the list of dictionaries and arranges them neatly into a table.

Sorting by timestamp

Since the event data was generated by looping through resident_profiles, the data is initially arranged in the same order in which the residents were sorted (i.e., the order in which the resident_profiles were generated). In order to make this data more realistic, the data should be sorted by timestamp.

Pandas has a tool for easily doing this:

address_change_df = address_change_df.sort_values('timestamp').reset_index(drop=True)
address_change_df

After running these commands, all events are sorted by time of occurrence.

With that, you have learned to use some basic tools to help generate a randomized dataset!

Customizing Dataset to Patterns

Is this dataset good enough to test your patterns? It depends on the pattern. You must ask yourself two questions:

  • What kinds of behaviors do my patterns look for?
  • Are those behaviors present in my data?

If your pattern simply looks for residents that have moved in the last five years, this dataset will test the accuracy of your pattern sufficiently, because it contains events that happened within five years as well as events that happened more than five years ago. (In other words, it contains data to validate cases that both should and should not match the pattern.) If the result of the pattern generates a new notification event for an input event that happened more than five years ago, you know that there is something wrong in the pattern.

However, if you want to test a pattern that generates a new notification when a resident has moved two times or more, then this dataset is not suited to thoroughly test for that. This is because we generated five moving events for each resident, and as a result, every single resident will be expected to match, while there will be none to validate a non-matching case.

To remedy this, adjust the number of moving events associated with each resident so that some will have moved two times or more, and some will only have moved once. The adjusting process can be as simple as coding the first five residents in our resident_profiles to only have one moving event generated for them, while the remaining residents to generate five moving events associated with their ID. A more elegant way to accomplish this is to randomize the number of moving events per resident using Python's random library.

Modify the original address change event generator so that it generates any number of records between 1 and 5:

import random

address_change_records = []

for resident in resident_profiles:
for _ in range(random.randint(1, 5)):
address_change_records.append({
'record_id': str(uuid.uuid4()),
'resident_id': resident['resident_id'],
'timestamp': datetime_to_zulu(fake.date_time()),
'old_address': fake.address(),
'new_address': fake.address(),
'new_coordinates': latlng_to_geojson(fake.latlng())
})

print(f'total number of address change records: {len(address_change_records)}')

Count the number of events each resident has, so that you will know which residents you expect to match your pattern, and which residents you expect to be absent from your results. Pandas is a great tool to do this quickly in a single line of code:

address_change_records = pd.DataFrame(address_change_records)
address_change_records.groupby('resident_id').size()

In the generated data, you want to see:

  • A few residents that have only one event.
  • At least one resident that has exactly two events.
  • Some residents that have more than two events.

If you don't see these criteria being met, just regenerate them until it meets these criteria. The idea is that your dataset should contain all cases, including any edge cases you can think of (such as having exactly two events for a filter that looks for two or more events).

It is important to customize your dataset to target matching and non-matching cases in your pattern, and to know what should and should not be expected in your output results.

Saving and Loading

You may want to save your data in order to use it later.

Start by setting the destination folder file_directory:

# Enter your desired directory here
file_directory = './'

In this exercise, we have generated two types of data:

  • resident_profiles, which is in the form of a list of dictionaries.
  • address_change_records, which is in a pandas DataFrame.

You can save this data using Pandas or using the JSON library.

Saving Using Pandas

Let's start with address_change_records. Because we can leverage the Pandas library, this is the simplest way to save your data to a JSON file:

address_change_records.to_json(file_directory + 'address_change_records.json', orient='records')

Well done! You should now be able to find the address_change_records.json file in your directory.

Saving Using JSON Library

The other way of saving JSON files is to use the JSON library that comes standard with your Python installation.

Let's use it to save resident_profiles to a JSON file:

import json

with open(file_directory + 'resident_profiles.json', 'w') as f:
json.dump(resident_profiles, f)
tip

You can also convert your list of dictionaries to a Pandas DataFrame before saving the file to JSON as follows:

pd.DataFrame(resident_profiles).to_json(...)`

Reading from JSON Files

To read your file to a Pandas DataFrame, do the following:

pd.read_json(file_directory + 'resident_profiles.json', orient='records')

To read using the standard JSON library, do the following:

with open(file_directory + 'address_change_records.json', 'r') as f:
address_change_records = json.load(f)

address_change_records

Working with Kafka

The purpose of this section is to provide a quick overview of publishing and reading data into and from your Kafka cluster.

Kafka is often described as a message publication and subscription system. It is generally used in distributed systems, such as in the cloud, to facilitate distributed data storage and computation. Due to its distributed nature, data is often configured to be replicated over all Kafka nodes for redundancy, making Kafka a robust form of data storage as well.

The object responsible for writing (or publishing) data into Kafka is called a producer. Kafka stores data in different buckets called topics. Each topic must have a unique name. The object responsible for reading data from Kafka topics is called a consumer. A topic can have one or more consumers reading from it at any given time.

When a message is published to a topic, the consumers that are "subscribed" to this topic are made aware of this change immediately. For this reason, Kafka is an ideal data solution for streaming applications.

note

Although Kafka supports many data formats, Cogynt currently only supports messages in JSON format.

Setup

There are a few libraries that are designed to handle data communication with Kafka in Python. The one used in this guide is kafka-python.

To install kafka-python via your terminal, run pip install kafka-python.

If you wish to install through a Jupyter notebook, run the following code cell:

import sys
!{sys.executable} -m pip install kafka-python

Another library used throughout this guide is Pandas. Pandas is a powerful data analysis and manipulation tool that is essential for any project involving data. For installation instructions, see Pandas.

Finish your setup by importing the necessary modules:

from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import pandas as pd

Creating Producers

Before we begin, we must first be able to identify where Kafka is in the system.

In your system, Kafka's address is set up to be kafka-[YOUR_ENVIRONMENT].cogilitycloud.com with a port number 31090. Enter it as follows:

# Enter the name of your environment
kafka_address = f'kafka.confluent.svc.cluster.local:9071'

Now proceed to create a producer using Python:

producer = KafkaProducer(bootstrap_servers=kafka_address,
value_serializer=lambda x: json.dumps(x).encode('utf-8'))

Cogynt currently only accepts data in JSON format. By configuring the value_serializer setting on KafkaProducer, we inform the producer that the supplied data is in JSON format, and should be serialized to bytes using UTF-8 format.

Now that a producer is created, it can be used to send a message to Kafka.

First, create some data:

data = {
'name': 'Sean Kenneday',
'city': 'Rancho Santa Margarita',
'state': 'CA'
}

Then, create a topic to contain the data. Name this first topic profile:

topic_name = 'profile'

The following code block creates a connection to the configured Kafka cluster and creates an empty topic named profile with a single partition and a replication factor of 1.

note

The partition and replication factor are useful configurations that can be adjusted to optimize processing performance depending on the expected data load and complexity of your application.

client = KafkaAdminClient(bootstrap_servers=kafka_address)
topic = NewTopic(name=topic_name,
num_partitions=1,
replication_factor=1)
client.create_topics(new_topics=[topic], validate_only=False)

After running the code, an empty topic named profile should have been created. It is now ready to receive data. Go ahead and publish the data to Kafka:

producer.send(topic_name, data);

Congratulations! Your Kafka message has been sent.

Two things happened when you ran the preceding code:

  • A topic called profile was created.
  • The contents of data were sent to this topic.

If the topic name already exists in the list of topics in Kafka, it appends the data to the topic and its existing contents.

Creating Consumers

How do we know that the message from Creating Producers is now in Kafka? Let's take a look at the topic and find out.

First, verify that the topic was indeed created:

consumer = KafkaConsumer(bootstrap_servers=kafka_address)
for topic in consumer.topics():
print(topic)

The name profile should be among the listed topics.

For convenience, it is helpful to create a function that reads from a topic, such as the following:

def read_topic(topic_name):
"""
Creates a new Kafka consumer that reads from the beginning of the specified topic and returns the contents in a pandas dataframe.
"""
consumer = KafkaConsumer(topic_name,
bootstrap_servers=kafka_address,
auto_offset_reset='earliest', ## start reading from the earliest entry of this topic
consumer_timeout_ms=1000, ## stop 'listening' to the topic when incoming messages cease for 1000 milliseconds (1 second)
value_deserializer=lambda x: json.loads(x.decode('utf-8'))) ## deserialize incoming message bytes to json format

message_list = []
for c in consumer:
message_list.append(c.value)

return pd.DataFrame(message_list)

Reading from Topics

Next, look into the topic using the read_topic method created in Creating Consumers:

read_topic(topic_name)

You can create more data entries to the same topic, and the method will be able to retrieve them all.

# Generate more sample profile data
names = ['Lindsay Chase', 'Branden Carter', 'Nathan Landino', 'Aditya Kumakale', 'Edwin Diaz']
cities = ['Mission Viejo', 'Irvine', 'Fullerton', 'Data Point', 'San Clemente']

for name, city in zip(names, cities):
# Publish more data to the topic
producer.send(topic_name, {
'name': name,
'city': city,
'state': 'CA'
})

# Read from topic
read_topic(topic_name)

Deleting Topics

For security reasons, topics should never be deleted during production. (In fact, Cogynt is designed to shield users from being able to delete data from Kafka.) However, for training and testing purposes, it is convenient to have the ability to start from scratch.

warning

Topic deletion is irreversible. Proceed with caution.

Delete a topic with the following code:

client = KafkaAdminClient(bootstrap_servers=kafka_address)
client.delete_topics([topic_name]);

One or more topics can be deleted in the same line by providing the delete method an array of topic names to be deleted:

client.delete_topics(['topic_A', 'topic_B', 'topic_C', 'topic_D'])

Verify whether the topic profile has indeed been deleted with the following code:

consumer = KafkaConsumer(bootstrap_servers=kafka_address)
for topic in consumer.topics():
print(topic)