Optimizing and Refactoring
This section discusses how to make models run more efficiently. The process includes both optimizing job performance and resources to help the overall system run better, and refactoring models for faster calculations and easier readability.
Optimizing Performance and Resources
In Cogynt, optimization involves managing Flink job performance and resources. Optimization is about maximizing performance while minimizing resources. Performance and resources tend to have a proportional relationship with one another. In general, the higher the performance required, the more resources required to achieve it.
Poor performance optimization can lead to:
- High latency (for example, not receiving results in a timely manner, resulting in lost business opportunities or catastrophic failure).
- Deployments failing unexpectedly under load.
Poor resource optimization can lead to:
- Deployments failing unexpected when resources are under-provisioned.
- Unnecessarily incurring high operation cost when over-provisioning.
That being said, one should not assign resources at random to a job and expect an automatic performance boost. The data, pattern constraints, computation complexity, and many other factors affect the way the Flink job performs.
With this principle in mind, the following topic headings outline strategies for optimizing your system's performance.
Modifying Flink Configurations
The parallelism
setting is the most direct way for users to boost performance. The higher the parallelism, the more asynchronous jobs can be scheduled, and the higher the throughput.
Example
A
parallelism
value of10
means that ten tasks tasks can be done simultaneously at any given time.
Increasing parallelism, however, proportionately increases the resource overhead.
Kafka topics that the deployment reads from and writes to must also have a sufficient number of partitions for parallel processes to take advantage of. Otherwise, those resources are wasted.
Example
Topics with 10 or 20 partitions are well optimized for a Flink deployment with
parallelism
of10
. Therefore, to increase the speed of ingestion, the topic should have ten individual partitions, and they should be read from and operated on simultaneously.
Adjusting Task Manager Resources
Task managers are the workers of your job. The more workers you have, the more parallel jobs you can create. The more resources you can provision your task managers, the more efficient they become at performing their tasks.
The amount of resources you can allot a single task manager is determined by the type of worker node configured in your cloud provider. (For example, a common Cogynt setup is 32 GB RAM and 8 CPUs.)
Task managers can be split into multiple slots. The number of slots is the number of threads the task manager has available to perform parallel tasks. The management memory for each task manager is equally divided among each slot. CPU is not divided.
Example
Running a deployment with a
parallelism
value of10
requires a total of ten slots. To make ten slots, you can provision either:
- Two task managers with five slots each.
- Five task managers with two slots each.
Using Checkpoints
Checkpoints are performed to save the system's state and safeguard against job failure. In the event of a system failure or other interruption, a checkpoint enables the system pick up where it left off when spinning up.
Although checkpoints are helpful, they require resources to record and use. Checkpoints can fail to record if the state that needs saving is sizable. Make sure that the checkpoint timeout and resource allocation are sufficient for your purposes:
- When checkpoints are being saved, all processing stops so that the system can avoid accumulating state in the meantime. You can enable concurrent checkpoints with processing (which may be ideal for some projects), but this setup devotes a portion of resources to creating checkpoints, denying those resources to the jobs that continue to run. Be mindful of the potential for job failure from this reduced resource availability.
- If your checkpoint timeout is too low, it can trap the system in a perpetual loop of failure and restarting. When the checkpoint timeout is too low, you effectively cause the system never to take checkpoints. After timeout, the system abandons its attempt to record the present state, and Flink goes back to a previous checkpoint to try again. However, this creates a situation where the discrepancy between the previous checkpoint and the current state is still too large to record, so that the attempt to record a new checkpoint exceeds the checkpoint timeout and causes the cycle to repeat.
Note
In many cases, it may be unclear when you reach the checkpoint maximum. Pinpointing the checkpoint maximum requires trial and error, and is best learned from acquiring familiarity with your data set as you work with it.
Managing Pattern Configuration Per Deployment
Pattern configuration has a direct effect on operational efficiency. In general, the number of tasks scheduled inside of Flink is a function of:
- The collective number of patterns assigned per deployment.
- The collective complexity of patterns in the deployment (such as the number of computational elements and connections).
- The relationships among the patterns being deployed.
With these in mind, there are several strategies one can use to reduce Flink's operational load:
- Eliminating redundancies within and across patterns can reduce both the complexity of individual patterns and the number of patterns deployed.
- Patterns with significant internal complexity, or patterns that are otherwise independent from other patterns, can conserve resources if deployed alone.
- Like patterns with high dependency may sometimes benefit from being deployed together. However, if the downstream pattern is significantly more complex than its upstream pattern, the rate it processes data may be much lower than the rate at which data becomes available to it, thus creating a bottleneck. In this case, it may be more advantageous to deploy the patterns separately.
Using Pattern-Level Features
Pattern-level features provide multiple sites for optimization. There are several pattern-level features to take advantage of:
- Windowing: Windowing can improve the efficiency of aggregations and checkpoints. It can configure aggregations to aggregate data only over a specific window, or discard the operational state past a certain window. These options may only be appropriate for a few applications, but should be used whenever they are viable.
- Time-to-Live (TTL): Optimizing through TTL follows the same logic as windowing. The main difference is that TTL chooses what to save and discard based on the expiration of a predefined time frame rather than a window.
- Insert-Only: The insert-only feature can cut down on unnecessary operations. If enabled, the first time the system encounters an entity, it marks the entity as having provided the necessary information. The system then avoids revisiting the entity unless further information is added.
- Internal Event Types: Pattern outcomes designated as internal event types do not publish to Kafka, and therefore do not involve Kafka read/write operations. This designation can be useful for transient operations (such as data transformations) where the outcome is intended to feed another pattern instead of providing actionable information.
- Aggregation Functions: Aggregation functions can be resource-intensive. Cut down on high-intensity aggregation functions, and aggregate data only when necessary.
Differentiating Historical Data and Live Data
Compared to live streaming data, historical data requires significantly more resources to process, because the volume of data needed to be processed at a given time is much higher than when streaming live data. For instance, to process historical data might involve reviewing 10,000 records per second, whereas live data might have only five records per second to review.
Given the volume of records involved, processing historical data benefits from high parallelism and high amounts of memory and CPU in the deployment configurations.
The resources required for streaming data depend on the data ingestion rate. If a data source produces five records per second with moderate pattern complexity, a single parallelism with 2 GB RAM and 1 CPU will suffice. If processing larger data flows, the amount of resources and parallelism can be increased to reach the desire latency.
When deploying jobs for the first time, it is common to process historic data first in order to "catch up" to the most recent data. If high amounts of resources were provisioned to process historic data, the resource will not be put to good use once the job has caught up and is processing a much smaller volume data flow.
To save costs on resources:
- Pause the job to create a save point.
- Adjust the Flink configurations to reduce the task manager and parallelism configurations after the deployment.
- Resume the deployment. The job spins up with the new configurations.
Monitoring Performance Metrics
Flink's graph nodes are color-coded to indicate whether operations are proceeding as expected. Paying attention to the graph nodes can reveal points needing optimization:
- Blue means that the node is running with minimal load. Since it has resources to spare, it could be a candidate for additional assignments.
- Red indicates a bottleneck. This means the node requires your attention.
For additional details, expand the view to see a breakdown of tasks across all task managers and their status.
Using FlameGraph
FlameGraph is a utility for monitoring task resource consumption with high precision and detail. It reveals the amount of time and resources each task is consuming, down to the individual Java classes and operations consuming them.
To activate FlameGraph:
- Navigate to the custom configuration section of your deployment or deployment target.
- Add the setting
rest.flamegraph.enabled: true
.
Advanced Configurations
All Flink configurations can be used directly within Cogynt. They can be set as custom configurations within the deployment or deployment target settings of Cogynt.
Warning
Contact Cogility's technical support team before resorting to using custom configs.
Users should only work with custom configs when fine tuning with the main available configurations in the UI does not provide the desired performance.
An experienced Flink user may be able to take advantage of the custom config capability. However, Cogility has not internally tested every single configuration to verify its impact to the job. Use these configurations at your own risk.
Refactoring Models
Refactoring is the process of restructuring an existing model to improve its operations without changing its functionality.
This section discusses methods for reviewing a model to identify points of improvement, and ways to implement those improvements quickly and safely.
Creating Versions and Backups
Before attempting to refactor a model, it is worth backing up the project or pattern under consideration. This way, you will have a functional copy on hand in the event that refactoring does not go as planned.
To create a backup:
- Make a copy of the model, project, or pattern you intend to refactor.
- Version the copy by giving it a meaningful name indicative of its date or state.
Updating Data Schemas
Refactoring data schemas typically involves:
Strategies for approaching these processes are outlined under the corresponding topic headings.
Updating User Data Schema Field Datatypes
Changing the datatype of a given user data schema field may allow its use in more efficient or effective computations. For instructions on editing fields, refer to Editing User Data Schema Fields in the Cogynt Authoring User Guide.
Before a field's datatype can be changed, the field must:
- Not be mapped to a function or outcome in computations of any pattern.
- Not be used in a constraint, partition, window, TTL, or batch output in any pattern.
If these criteria are met, the field's datatype can be changed at will.
If the criteria are not met, updating the field requires additional steps:
- Disconnect the field from all dependencies until it satisfies the preceding criteria.
- Change the datatype.
- Reconnect everything again to restore the original dependencies.
If the field is too intertwined with the model, such that disconnecting all the dependencies would require significant effort, the recommended workaround is to add a new field to replace the old field:
- Rename the old field with a placeholder (for example, append
(old)
to the field's name), then set the name and path of the new field to the original field name and path. - Go through all the patterns that have a dependency on the original field. Un-map all the connections to the old field and map them to the new field with the updated datatype.
- Replace all the partitions, constraints, windowing fields, and so on that contain the old field, reconfiguring them with the new field.
- Once all the dependencies have been remapped, delete the old field.
Note
If deleting the old field triggers a warning message, it means some dependencies were missed. Review the dependencies attached to the old field, and remap any that are still connected.
Adding Fields
Adding new fields is a useful means of creating temporary placeholders or introducing new places to work on a model.
For instructions on adding fields, refer to Adding Fields to User Data Schemas in the Cogynt Authoring User Guide.
Deleting Fields
Deleting unnecessary fields can help to streamline a model. For instructions on deleting fields, refer to Deleting User Data Schema Fields in the Cogynt Authoring User Guide.
Before a field can be deleted, it must:
- Not be mapped to a function or outcome in computations of any pattern.
- Not be used in a constraint, partition, window, TTL, or batch output in any pattern.
If these criteria are met, the field can be deleted.
If the criteria are not met, the field must be disconnected from all dependencies before attempting to delete it.
Note
If deleting the old field triggers a warning message, it means some dependencies were missed. Review the dependencies attached to the old field, and remap any that are still connected.
Updating Patterns
Updating patterns can involve modifying the logic of:
Updates to any pattern-level configuration should always be done mindfully. Pattern updates can cause significant changes to pattern logic, meaning that the results could have far different interpretations or implications than they did prior to refactoring.
If the intent of refactoring is to have the changes produce results that can seamlessly integrate with past results (that is, without disrupting workstation or other dashboard visualizations), one should be cautious when making pattern-level changes, especially when modifying partitions.
Changing Constraints
Changes to constraints modify the rules that determine whether a pattern will trigger an output. As long as the partitions stay the same, constraints can change to adapt to new rules, and will still safely integrate back into the rest of its previous results without issue.
The exception is for results that were valid in the past, but are no longer valid after the update. Therefore, in most situations, it is recommended to redeploy and reset all output topics when redeploying.
If using collections in Workstation as part of a workflow, a reference is created between events and collections the moment an event is place in the collection.
Changing Partitions
Updating partitions can drastically change the meaning and implications of the outcome of the pattern.
Users must be especially mindful when updating partitions. It will likely require computation changes and updates in downstream patterns (in other words, patterns that consume the outcome of the pattern in question as an input).
Changing Computations
When the partitions have changed in a pattern, aggregations sometimes need to be adjusted to accurately compute new results.
Additional results may also be required. Follow the field update recommendations on how to best make changes to fields in an event type.
Changing Risk Definitions
Adding or deleting input elements in patterns sometimes requires adjusting the risk table.
Review the weights or the Bayesian table after every substantial addition or deletion, and adjust them as necessary.
Other Pattern Configurations
- TTL: Be mindful within computations that TTL changes the events that are included in aggregation operations.
- Windowing: Similar to TTL, modifying windowing configurations also changes the events that are included in aggregations. More importantly, adding and removing windowing will almost certainly affect the way downstream patterns interpret the outcome of the modified pattern.
- Output Batching: Output batching has minimal impact on the logical interpretation of pattern results. When used properly, it reduces the data load of downstream patterns. The main consideration is to ensure that the batch period used continues to satisfy any latency requirements the project has.
Updating Deployments
The performance increases from refactoring are not realized until the project's deployment is updated and dispatched.
The following sections outline some considerations to keep in mind when updating deployments.
Flink Configurations
Oftentimes, upon first deployment, a significant amount of historical data needs to be processed before it catches up to the most current state.
Users are encouraged to pause and change deployment settings to optimize the operational cost and resource consumption once processing has caught up to the present. For more information about optimization, see Optimizing Performance and Resources.
Patterns
The list of patterns deployed cannot be updated while the deployment is paused. Otherwise, the pattern list has relatively low impact on the overall deployment workflow.
Updating Deployment Targets and Datasources
Deployment targets can only be updated when no deployments are actively running that use the deployment target.
Each event type references a data source alias. By default, the alias is default
. Should the user choose to change the Kafka broker connection string, the deployments likely need to be redeployed.
If the user wishes to deploy and publish the results of a project to additional Kafka brokers, instead of creating duplicate deployments within the same project, it is advised to make a copy of the project and deploy it separately for a much better deployment management experience. This also reduces unwanted complications in the future, where there might be a case when different configurations between the two different Kafka published locations are desired.