Integration
Integrating Connect with Blockchain Applications
Connect is business-licensed software under the Business Source License (BSL). The source code is viewable; however, please reach out to us if you are interested in integrating. We are limiting the number of partnerships we engage with in 2024. We apologize in advance if we reach capacity and are unable to accommodate new integrations.
This document will guide you through integrating Connect in your application.
Requirements
- Go 1.22+
- Cosmos SDK v0.50+
Integrating Connect
Integrating Connect into your Cosmos SDK application requires a few simple steps.
Add Oracle Configuration
Add the oracle configuration to your application.
Add Modules
Import and add the x/marketmap
and x/oracle
Modules to your app.
Setup Oracle Client
Set up the Oracle client the application will use to get prices from the Connect oracle.
Set ABCI Method
Set the PreBlock ABCI method, which is responsible for aggregating prices and writing them to the application’s state.
Configure Vote Extensions
Configure vote extensions with compression and storage strategies.
Application Configuration
The application node’s configuration must be extended so the oracle configuration can be read into the node through app.toml
.
The application should contain a custom configuration struct with a "github.com/cosmos/cosmos-sdk/server/config"
embedded.
Note: application function and type names may vary. The names in the following steps are arbitrary for example purposes only.
// CustomAppConfig defines the configuration for the app.
type CustomAppConfig struct {
serverconfig.Config
// ... other configurations
Oracle oracleconfig.AppConfig `mapstructure:"oracle" json:"oracle"`
}
Next, append the Oracle’s default config template to the custom application template.
func CustomConfigTemplate() string {
return serverconfig.DefaultConfigTemplate + oracleconfig.DefaultConfigTemplate
}
Finally, add a default configuration.
func DefaultConfig() (string, CustomAppConfig) {
serverConfig := serverconfig.DefaultConfig()
// edit serverConfig as needed
oracleCfg := oracleconfig.AppConfig{
Enabled: true,
OracleAddress: "localhost:8080",
ClientTimeout: time.Second * 2,
MetricsEnabled: true,
}
customConfig := CustomAppConfig{
Config: *serverConfig,
Oracle: oracleCfg,
}
return CustomConfigTemplate(), customConfig
}
The template and default configuration should be passed into server.InterceptConfigsPreRunHandler
in the application’s root command.
Example:
package cmd
import (
// ...
"github.com/cosmos/cosmos-sdk/server"
)
func NewRootCmd() *cobra.Command {
// ....
customAppTemplate, customAppConfig := DefaultConfig() // call function from previous step
return server.InterceptConfigsPreRunHandler(cmd, customAppTemplate, customAppConfig, cometConfig)
}
Keepers
Add x/marketmap and x/oracle keepers to the application.
package app
import (
// ... other imports
marketmapkeeper "github.com/skip-mev/connect/v2/x/marketmap/keeper"
oraclekeeper "github.com/skip-mev/connect/v2/x/oracle/keeper"
)
type App struct {
// ... other fields
OracleKeeper *oraclekeeper.Keeper
MarketMapKeeper *marketmapkeeper.Keeper
}
Then, add them to the dependency injection system.
err := depinject.Inject(
// ... other arguments
&app.MarketMapKeeper,
&app.OracleKeeper,
)
Finally, once the app is built with the appBuilder
, finish the initialization of the MarketMapKeeper
by setting the hooks.
app.App = appBuilder.Build(db, traceStore, baseAppOptions...)
app.MarketMapKeeper.SetHooks(app.OracleKeeper.Hooks())
Oracle Client
Create a method to construct and return the oracle client and metrics.
package app
import (
"context"
"github.com/cosmos/cosmos-sdk/server/types"
oracleconfig "github.com/skip-mev/connect/v2/oracle/config"
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
servicemetrics "github.com/skip-mev/connect/v2/service/metrics"
)
// initializeOracle initializes the oracle client and metrics.
func (app *App) initializeOracle(appOpts types.AppOptions) (oracleclient.OracleClient, servicemetrics.Metrics, error) {
// Read general config from app-opts, and construct oracle service.
cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
if err != nil {
return nil, nil, err
}
// If app level instrumentation is enabled, then wrap the oracle service with a metrics client
// to get metrics on the oracle service (for ABCI++). This will allow the instrumentation to track
// latency in VerifyVoteExtension requests and more.
oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, app.ChainID())
if err != nil {
return nil, nil, err
}
// Create the oracle service.
oracleClient, err := oracleclient.NewPriceDaemonClientFromConfig(
cfg,
app.Logger().With("client", "oracle"),
oracleMetrics,
)
if err != nil {
return nil, nil, err
}
// Connect to the oracle service (default timeout of 5 seconds).
go func() {
app.Logger().Info("attempting to start oracle client...", "address", cfg.OracleAddress)
if err := oracleClient.Start(context.Background()); err != nil {
app.Logger().Error("failed to start oracle client", "err", err)
panic(err)
}
}()
return oracleClient, oracleMetrics, nil
}
ABCI and Vote Extensions
Configure the ABCI methods and vote extensions.
Define a method to contain the logic where these will be configured.
func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {}
Within this method, do the following:
- Setup Proposal Handler: This handler will be used in
PrepareProposal
andProcessProposal
to fill proposals with the oracle data. - Set PreBlocker: The application’s
PreBlocker
will be configured to write price data to state before transactions are executed. - Set Vote Extensions: Set the vote extension handlers on the application that will handle adding price data to the node’s consensus votes.
Start with setting up the proposal handler.
package app
import (
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
servicemetrics "github.com/skip-mev/connect/v2/service/metrics"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/skip-mev/connect/v2/abci/proposals"
compression "github.com/skip-mev/connect/v2/abci/strategies/codec"
"github.com/skip-mev/connect/v2/abci/strategies/currencypair"
"github.com/skip-mev/connect/v2/abci/ve"
)
func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {
// Create the proposal handler that will be used to fill proposals with
// transactions and oracle data.
proposalHandler := proposals.NewProposalHandler(
app.Logger(),
baseapp.NoOpPrepareProposal(),
baseapp.NoOpProcessProposal(),
ve.NewDefaultValidateVoteExtensionsFn(app.StakingKeeper),
compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
),
compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZStdCompressor(),
),
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper),
oracleMetrics,
)
app.SetPrepareProposal(proposalHandler.PrepareProposalHandler())
app.SetProcessProposal(proposalHandler.ProcessProposalHandler())
}
Next, set up the PreBlocker
. This involves:
- Aggregate Function: Setting the aggregator function that combines all reported prices into one final price per currency pair.
- Currency Pair Strategy: Setting the currency pair strategy. For this example, we will use the
DeltaCurrencyPairStrategy
which encodes/decodes the price as the difference between the current price and the previous price. While other strategies are available, we recommend this one for most applications. - Data Compression Codecs: Setting the compression strategy for vote extensions and extended commits.
package app
import (
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
servicemetrics "github.com/skip-mev/connect/v2/service/metrics"
oraclepreblock "github.com/skip-mev/connect/v2/abci/preblock/oracle"
compression "github.com/skip-mev/connect/v2/abci/strategies/codec"
"github.com/skip-mev/connect/v2/abci/strategies/currencypair"
"github.com/skip-mev/connect/v2/pkg/math/voteweighted"
)
func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {
// ...
// Create the aggregation function that will be used to aggregate oracle data
// from each validator.
aggregatorFn := voteweighted.MedianFromContext(
app.Logger(),
app.StakingKeeper,
voteweighted.DefaultPowerThreshold,
)
veCodec := compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
)
ecCodec := compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZStdCompressor(),
)
// Create the pre-finalize block hook that will be used to apply oracle data
// to the state before any transactions are executed (in finalize block).
oraclePreBlockHandler := oraclepreblock.NewOraclePreBlockHandler(
app.Logger(),
aggregatorFn,
app.OracleKeeper,
oracleMetrics,
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
veCodec,
ecCodec,
)
app.SetPreBlocker(oraclePreBlockHandler.WrappedPreBlocker(app.ModuleManager))
}
Next, configure the vote extensions using the vote extension codec, extended commit codec, and aggregator function from the previous step.
package app
import (
"time"
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
servicemetrics "github.com/skip-mev/connect/v2/service/metrics"
"github.com/skip-mev/connect/v2/abci/ve"
"github.com/skip-mev/connect/v2/abci/strategies/currencypair"
"github.com/skip-mev/connect/v2/abci/strategies/aggregator"
)
func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {
// ... snip ...
// Create the vote extensions handler that will be used to extend and verify
// vote extensions (i.e. oracle data).
voteExtensionsHandler := ve.NewVoteExtensionHandler(
app.Logger(),
oracleClient,
time.Second, // timeout
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
veCodec,
aggregator.NewOraclePriceApplier(
aggregator.NewDefaultVoteAggregator(
app.Logger(),
aggregatorFn,
// we need a separate price strategy here, so that we can optimistically apply the latest prices
// and extend our vote based on these prices
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
),
app.OracleKeeper,
veCodec,
ecCodec,
app.Logger(),
),
oracleMetrics,
)
app.SetExtendVoteHandler(voteExtensionsHandler.ExtendVoteHandler())
app.SetVerifyVoteExtensionHandler(voteExtensionsHandler.VerifyVoteExtensionHandler())
}
Finally, call these methods back in app.go
, directly after setting the x/marketmap
hooks.
app.MarketMapKeeper.SetHooks(app.OracleKeeper.Hooks())
// oracle initialization
client, metrics, err := app.initializeOracle(appOpts)
if err != nil {
return nil, fmt.Errorf("failed to initialize oracle client and metrics: %w", err)
}
app.initializeABCIExtensions(client, metrics)
Initializing Modules
In order for the application to use Connect properly, the following is required:
- Set the consensus parameters to enable vote extensions
- Initialize
x/marketmap
with initial markets
package app
import (
"slices"
tmtypes "github.com/cometbft/cometbft/proto/tendermint/types"
sdk "github.com/cosmos/cosmos-sdk/types"
consensustypes "github.com/cosmos/cosmos-sdk/x/consensus/types"
"github.com/skip-mev/connect/v2/cmd/constants/marketmaps"
)
func (app *App) setupMarkets(ctx sdk.Context) error {
// add core markets
coreMarkets := marketmaps.CoreMarketMap
markets := coreMarkets.Markets
// sort keys so we can deterministically iterate over map items.
keys := make([]string, 0, len(markets))
for name := range markets {
keys = append(keys, name)
}
slices.Sort(keys)
for _, marketName := range keys {
// create market
market := markets[marketName]
err := app.MarketMapKeeper.CreateMarket(ctx, market)
if err != nil {
return err
}
// invoke hooks. this syncs the market to x/oracle.
err = app.MarketMapKeeper.Hooks().AfterMarketCreated(ctx, market)
if err != nil {
return err
}
}
return nil
}
For new chains, or to test the integration, the method above can be called in InitChainer
. Connect will begin posting prices to the chain once the VoteExtensionsEnabledHeight
is reached.
package app
func NewApp(
logger log.Logger,
db dbm.DB,
traceStore io.Writer,
loadLatest bool,
appOpts servertypes.AppOptions,
baseAppOptions ...func(*baseapp.BaseApp),
) *App {
// ...
// initialize the chain with markets in state.
app.SetInitChainer(func(ctx sdk.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
// set vote extension height. (must be greater than 1).
req.ConsensusParams.Abci.VoteExtensionsEnableHeight = 2
// initialize module state
app.OracleKeeper.InitGenesis(ctx, *oracletypes.DefaultGenesisState())
app.MarketMapKeeper.InitGenesis(ctx, *marketmaptypes.DefaultGenesisState())
// initialize markets
err := app.setupMarkets(ctx)
if err != nil {
return nil, err
}
return app.App.InitChainer(ctx, req)
})
// ...
}
For live running chains, use an upgrade handler. Note: Connect will not post prices to the chain until the upgrade is executed.
package app
func NewApp(
logger log.Logger,
db dbm.DB,
traceStore io.Writer,
loadLatest bool,
appOpts servertypes.AppOptions,
baseAppOptions ...func(*baseapp.BaseApp),
) *App {
// ...
connectUpgradeName := "connect-upgrade" // placeholder value, use a real upgrade name.
app.UpgradeKeeper.SetUpgradeHandler(connectUpgradeName, func(ctx context.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
migrations, err := app.ModuleManager.RunMigrations(ctx, app.Configurator(), fromVM)
if err != nil {
return nil, err
}
consensusParams, err := app.ConsensusParamsKeeper.Params(ctx, nil)
if err != nil {
return nil, err
}
consensusParams.Params.Abci = &tmtypes.ABCIParams{
VoteExtensionsEnableHeight: ctx.BlockHeight() + int64(10), // enables VE's at current_height + 10.
}
_, err = app.ConsensusParamsKeeper.UpdateParams(ctx, &consensustypes.MsgUpdateParams{
Authority: app.ConsensusParamsKeeper.GetAuthority(),
Block: consensusParams.Params.Block,
Evidence: consensusParams.Params.Evidence,
Validator: consensusParams.Params.Validator,
Abci: consensusParams.Params.Abci,
})
if err != nil {
return nil, err
}
// add the markets to the chain state.
err := app.setupMarkets(ctx)
if err != nil {
return migrations, err
}
return migrations, nil
})
upgradeInfo, err := app.UpgradeKeeper.ReadUpgradeInfoFromDisk()
if err != nil {
panic(fmt.Errorf("failed to read upgrade info from disk: %w", err))
}
// add the x/marketmap and x/oracle stores.
if upgradeInfo.Name == connectUpgradeName {
app.SetStoreLoader(
upgradetypes.UpgradeStoreLoader(
upgradeInfo.Height,
&storetypes.StoreUpgrades{
Added: []string{marketmaptypes.ModuleName, oracletypes.ModuleName},
Renamed: nil,
Deleted: nil,
},
),
)
}
// ...
}
Running the Node
Once the chain is properly configured, head over to the Quickstart guide to learn how to start the node with a Connect sidecar.
Need Help?
Need help with your integration? Feel free to reach out to us on Discord.