Connect is business-licensed software under the Business Source License (BSL). The source code is viewable; however, please reach out to us if you are interested in integrating. We are limiting the number of partnerships we engage with in 2024. We apologize in advance if we reach capacity and are unable to accommodate new integrations.

This document will guide you through integrating Connect in your application.

Requirements

  • Go 1.23+
  • Cosmos SDK v0.50+

Integrating Connect

Integrating Connect into your Cosmos SDK application requires a few simple steps.

1

Add Oracle Configuration

Add the oracle configuration to your application.

2

Add Modules

Import and add the x/marketmap and x/oracle Modules to your app.

3

Setup Oracle Client

Set up the Oracle client the application will use to get prices from the Connect oracle.

4

Set ABCI Method

Set the PreBlock ABCI method, which is responsible for aggregating prices and writing them to the application’s state.

5

Configure Vote Extensions

Configure vote extensions with compression and storage strategies.

Application Configuration

The application node’s configuration must be extended so the oracle configuration can be read into the node through app.toml.

The application should contain a custom configuration struct with a "github.com/cosmos/cosmos-sdk/server/config" embedded.

Note: application function and type names may vary. The names in the following steps are arbitrary for example purposes only.

config.go
// CustomAppConfig defines the configuration for the app.
type CustomAppConfig struct {
	serverconfig.Config
	// ... other configurations
	Oracle oracleconfig.AppConfig `mapstructure:"oracle" json:"oracle"`
}

Next, append the Oracle’s default config template to the custom application template.

config.go
func CustomConfigTemplate() string {
	return serverconfig.DefaultConfigTemplate + oracleconfig.DefaultConfigTemplate
}

Finally, add a default configuration.

config.go
func DefaultConfig() (string, CustomAppConfig) {
	serverConfig := serverconfig.DefaultConfig()
    // edit serverConfig as needed

	oracleCfg := oracleconfig.AppConfig{
		Enabled:        true,
		OracleAddress:  "localhost:8080",
		ClientTimeout:  time.Second * 2,
		MetricsEnabled: true,
	}
	customConfig := CustomAppConfig{
		Config: *serverConfig,
		Oracle: oracleCfg,
	}

	return CustomConfigTemplate(), customConfig
}

The template and default configuration should be passed into server.InterceptConfigsPreRunHandler in the application’s root command.

Example:

root.go
package cmd

import (
	// ...
	"github.com/cosmos/cosmos-sdk/server"
)
func NewRootCmd() *cobra.Command {
	// ....

    customAppTemplate, customAppConfig := DefaultConfig() // call function from previous step
    return server.InterceptConfigsPreRunHandler(cmd, customAppTemplate, customAppConfig, cometConfig)
}

Keepers

Add x/marketmap and x/oracle keepers to the application.

app.go
package app

import (
    // ... other imports

	marketmapkeeper "github.com/skip-mev/connect/v2/x/marketmap/keeper"
	oraclekeeper "github.com/skip-mev/connect/v2/x/oracle/keeper"
)

type App struct {
	// ... other fields

	OracleKeeper     *oraclekeeper.Keeper
	MarketMapKeeper  *marketmapkeeper.Keeper
}

Then, add them to the dependency injection system.

app.go
	err := depinject.Inject(
		// ... other arguments
		&app.MarketMapKeeper,
		&app.OracleKeeper,
	)

Finally, once the app is built with the appBuilder, finish the initialization of the MarketMapKeeper by setting the hooks.

app.go
	app.App = appBuilder.Build(db, traceStore, baseAppOptions...)

	app.MarketMapKeeper.SetHooks(app.OracleKeeper.Hooks())

Oracle Client

Create a method to construct and return the oracle client and metrics.

oracle.go
package app

import (
	"context"

    "github.com/cosmos/cosmos-sdk/server/types"
    oracleconfig "github.com/skip-mev/connect/v2/oracle/config"
    oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
    servicemetrics "github.com/skip-mev/connect/v2/service/metrics"
)

// initializeOracle initializes the oracle client and metrics.
func (app *App) initializeOracle(appOpts types.AppOptions) (oracleclient.OracleClient, servicemetrics.Metrics, error) {
	// Read general config from app-opts, and construct oracle service.
	cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
	if err != nil {
		return nil, nil, err
	}

	// If app level instrumentation is enabled, then wrap the oracle service with a metrics client
	// to get metrics on the oracle service (for ABCI++). This will allow the instrumentation to track
	// latency in VerifyVoteExtension requests and more.
	oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, app.ChainID())
	if err != nil {
		return nil, nil, err
	}

	// Create the oracle service.
	oracleClient, err := oracleclient.NewPriceDaemonClientFromConfig(
		cfg,
		app.Logger().With("client", "oracle"),
		oracleMetrics,
	)
	if err != nil {
		return nil, nil, err
	}

	// Connect to the oracle service (default timeout of 5 seconds).
	go func() {
		app.Logger().Info("attempting to start oracle client...", "address", cfg.OracleAddress)
		if err := oracleClient.Start(context.Background()); err != nil {
			app.Logger().Error("failed to start oracle client", "err", err)
			panic(err)
		}
	}()

	return oracleClient, oracleMetrics, nil
}

ABCI and Vote Extensions

Configure the ABCI methods and vote extensions.

Define a method to contain the logic where these will be configured.

oracle.go
func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {}

Within this method, do the following:

  • Setup Proposal Handler: This handler will be used in PrepareProposal and ProcessProposal to fill proposals with the oracle data.
  • Set PreBlocker: The application’s PreBlocker will be configured to write price data to state before transactions are executed.
  • Set Vote Extensions: Set the vote extension handlers on the application that will handle adding price data to the node’s consensus votes.

Start with setting up the proposal handler.

oracle.go
package app

import (
	oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
    servicemetrics "github.com/skip-mev/connect/v2/service/metrics"

    "github.com/cosmos/cosmos-sdk/baseapp"
    "github.com/skip-mev/connect/v2/abci/proposals"
    compression "github.com/skip-mev/connect/v2/abci/strategies/codec"
    "github.com/skip-mev/connect/v2/abci/strategies/currencypair"
    "github.com/skip-mev/connect/v2/abci/ve"
)

func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {
    // Create the proposal handler that will be used to fill proposals with
	// transactions and oracle data.
	proposalHandler := proposals.NewProposalHandler(
		app.Logger(),
		baseapp.NoOpPrepareProposal(),
		baseapp.NoOpProcessProposal(),
		ve.NewDefaultValidateVoteExtensionsFn(app.StakingKeeper),
		compression.NewCompressionVoteExtensionCodec(
			compression.NewDefaultVoteExtensionCodec(),
			compression.NewZLibCompressor(),
		),
		compression.NewCompressionExtendedCommitCodec(
			compression.NewDefaultExtendedCommitCodec(),
			compression.NewZStdCompressor(),
		),
		currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper),
		oracleMetrics,
	)
	app.SetPrepareProposal(proposalHandler.PrepareProposalHandler())
	app.SetProcessProposal(proposalHandler.ProcessProposalHandler())
}

Next, set up the PreBlocker. This involves:

  • Aggregate Function: Setting the aggregator function that combines all reported prices into one final price per currency pair.
  • Currency Pair Strategy: Setting the currency pair strategy. For this example, we will use the DeltaCurrencyPairStrategy which encodes/decodes the price as the difference between the current price and the previous price. While other strategies are available, we recommend this one for most applications.
  • Data Compression Codecs: Setting the compression strategy for vote extensions and extended commits.
oracle.go
package app

import (
    oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
    servicemetrics "github.com/skip-mev/connect/v2/service/metrics"

    oraclepreblock "github.com/skip-mev/connect/v2/abci/preblock/oracle"
    compression "github.com/skip-mev/connect/v2/abci/strategies/codec"
    "github.com/skip-mev/connect/v2/abci/strategies/currencypair"
    "github.com/skip-mev/connect/v2/pkg/math/voteweighted"
)

func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {
	// ... 

	// Create the aggregation function that will be used to aggregate oracle data
	// from each validator.
	aggregatorFn := voteweighted.MedianFromContext(
		app.Logger(),
		app.StakingKeeper,
		voteweighted.DefaultPowerThreshold,
	)
	veCodec := compression.NewCompressionVoteExtensionCodec(
		compression.NewDefaultVoteExtensionCodec(),
		compression.NewZLibCompressor(),
	)
	ecCodec := compression.NewCompressionExtendedCommitCodec(
		compression.NewDefaultExtendedCommitCodec(),
		compression.NewZStdCompressor(),
	)

	// Create the pre-finalize block hook that will be used to apply oracle data
	// to the state before any transactions are executed (in finalize block).
	oraclePreBlockHandler := oraclepreblock.NewOraclePreBlockHandler(
		app.Logger(),
		aggregatorFn,
		app.OracleKeeper,
		oracleMetrics,
		currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
		veCodec,
		ecCodec,
	)

	app.SetPreBlocker(oraclePreBlockHandler.WrappedPreBlocker(app.ModuleManager))
}

Next, configure the vote extensions using the vote extension codec, extended commit codec, and aggregator function from the previous step.

oracle.go
package app

import (
	"time"

	oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
        servicemetrics "github.com/skip-mev/connect/v2/service/metrics"

	"github.com/skip-mev/connect/v2/abci/ve"
	"github.com/skip-mev/connect/v2/abci/strategies/currencypair"
	"github.com/skip-mev/connect/v2/abci/strategies/aggregator"
)

func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {

	// ... snip ...

	// Create the vote extensions handler that will be used to extend and verify
	// vote extensions (i.e. oracle data).
	voteExtensionsHandler := ve.NewVoteExtensionHandler(
		app.Logger(),
		oracleClient,
		time.Second, // timeout
		currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
		veCodec,
		aggregator.NewOraclePriceApplier(
			aggregator.NewDefaultVoteAggregator(
				app.Logger(),
				aggregatorFn,
				// we need a separate price strategy here, so that we can optimistically apply the latest prices
				// and extend our vote based on these prices
				currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
			),
			app.OracleKeeper,
			veCodec,
			ecCodec,
			app.Logger(),
		),
		oracleMetrics,
	)
	app.SetExtendVoteHandler(voteExtensionsHandler.ExtendVoteHandler())
	app.SetVerifyVoteExtensionHandler(voteExtensionsHandler.VerifyVoteExtensionHandler())
}

Finally, call these methods back in app.go, directly after setting the x/marketmap hooks.

app.go
	app.MarketMapKeeper.SetHooks(app.OracleKeeper.Hooks())

	// oracle initialization
	client, metrics, err := app.initializeOracle(appOpts)
	if err != nil {
		return nil, fmt.Errorf("failed to initialize oracle client and metrics: %w", err)
	}

	app.initializeABCIExtensions(client, metrics)

Initializing Modules

In order for the application to use Connect properly, the following is required:

  • Set the consensus parameters to enable vote extensions
  • Initialize x/marketmap with initial markets
oracle.go
package app

import (
	"slices"

	tmtypes "github.com/cometbft/cometbft/proto/tendermint/types"
	sdk "github.com/cosmos/cosmos-sdk/types"
	consensustypes "github.com/cosmos/cosmos-sdk/x/consensus/types"
	"github.com/skip-mev/connect/v2/cmd/constants/marketmaps"
)

func (app *App) setupMarkets(ctx sdk.Context) error {
    // add core markets
    coreMarkets := marketmaps.CoreMarketMap
    markets := coreMarkets.Markets

    // sort keys so we can deterministically iterate over map items.
    keys := make([]string, 0, len(markets))
    for name := range markets {
        keys = append(keys, name)
    }
    slices.Sort(keys)

    for _, marketName := range keys {
        // create market
        market := markets[marketName]
        err := app.MarketMapKeeper.CreateMarket(ctx, market)
        if err != nil {
            return err
        }

        // invoke hooks. this syncs the market to x/oracle.
        err = app.MarketMapKeeper.Hooks().AfterMarketCreated(ctx, market)
        if err != nil {
            return err
        }
    }
    
    return nil
}

For new chains, or to test the integration, the method above can be called in InitChainer. Connect will begin posting prices to the chain once the VoteExtensionsEnabledHeight is reached.

app.go
package app

func NewApp(
	logger log.Logger,
	db dbm.DB,
	traceStore io.Writer,
	loadLatest bool,
	appOpts servertypes.AppOptions,
	baseAppOptions ...func(*baseapp.BaseApp),
) *App {
    // ...

	// initialize the chain with markets in state.
    app.SetInitChainer(func(ctx sdk.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
            consensusParams, err := app.ConsensusParamsKeeper.Params(ctx, nil)
            if err != nil {
                    return nil, err
            }
            consensusParams.Params.Abci = &types.ABCIParams{
                    VoteExtensionsEnableHeight: 5, // must be greater than 1
            }
            _, err = app.ConsensusParamsKeeper.UpdateParams(ctx, &consensustypes.MsgUpdateParams{
                    Authority: app.ConsensusParamsKeeper.GetAuthority(),
                    Block:     consensusParams.Params.Block,
                    Evidence:  consensusParams.Params.Evidence,
                    Validator: consensusParams.Params.Validator,
                    Abci:      consensusParams.Params.Abci,
            })
            if err != nil {
                    return nil, err
            }

            // initialize module state
            app.OracleKeeper.InitGenesis(ctx, *oracletypes.DefaultGenesisState())
            app.MarketMapKeeper.InitGenesis(ctx, *marketmaptypes.DefaultGenesisState())

            // initialize markets
            err := app.setupMarkets(ctx)
            if err != nil {
                return nil, err
            }

            return app.App.InitChainer(ctx, req)
	})

	// ...
}

For live running chains, use an upgrade handler. Note: Connect will not post prices to the chain until the upgrade is executed.

app.go
package app

func NewApp(
	logger log.Logger,
	db dbm.DB,
	traceStore io.Writer,
	loadLatest bool,
	appOpts servertypes.AppOptions,
	baseAppOptions ...func(*baseapp.BaseApp),
) *App {
    // ...

	connectUpgradeName := "connect-upgrade" // placeholder value, use a real upgrade name.

	app.UpgradeKeeper.SetUpgradeHandler(connectUpgradeName, func(ctx context.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
        migrations, err := app.ModuleManager.RunMigrations(ctx, app.Configurator(), fromVM)
        if err != nil {
            return nil, err
        }

        consensusParams, err := app.ConsensusParamsKeeper.Params(ctx, nil)
        if err != nil {
            return nil, err
        }
        consensusParams.Params.Abci = &tmtypes.ABCIParams{
            VoteExtensionsEnableHeight: ctx.BlockHeight() + int64(10), // enables VE's at current_height + 10.
        }
        _, err = app.ConsensusParamsKeeper.UpdateParams(ctx, &consensustypes.MsgUpdateParams{
            Authority: app.ConsensusParamsKeeper.GetAuthority(),
            Block:     consensusParams.Params.Block,
            Evidence:  consensusParams.Params.Evidence,
            Validator: consensusParams.Params.Validator,
            Abci:      consensusParams.Params.Abci,
        })
        if err != nil {
            return nil, err
        }

		// add the markets to the chain state.
		err := app.setupMarkets(ctx)
		if err != nil {
			return migrations, err
		}

		return migrations, nil
	})

	upgradeInfo, err := app.UpgradeKeeper.ReadUpgradeInfoFromDisk()
	if err != nil {
		panic(fmt.Errorf("failed to read upgrade info from disk: %w", err))
	}


	// add the x/marketmap and x/oracle stores.
	if upgradeInfo.Name == connectUpgradeName {
        app.SetStoreLoader(
            upgradetypes.UpgradeStoreLoader(
                upgradeInfo.Height,
                &storetypes.StoreUpgrades{
                    Added:   []string{marketmaptypes.ModuleName, oracletypes.ModuleName},
                    Renamed: nil,
                    Deleted: nil,
                },
            ),
        )
	}

	// ...
}

Running the Node

Once the chain is properly configured, head over to the Quickstart guide to learn how to start the node with a Connect sidecar.

Need Help?

Need help with your integration? Feel free to reach out to us on Discord.