Source code for mythx_cli.cli

"""The main runtime of the MythX CLI."""

import logging
import sys
import time
from glob import glob
from pathlib import Path

import click
from mythx_models.response import AnalysisListResponse
from pythx import Client, MythXAPIError
from pythx.middleware.toolname import ClientToolNameMiddleware

from mythx_cli import __version__
from mythx_cli.formatter import FORMAT_RESOLVER
from mythx_cli.payload import (
    generate_bytecode_payload,
    generate_solidity_payload,
    generate_truffle_payload,
)

LOGGER = logging.getLogger("mythx-cli")
logging.basicConfig(level=logging.WARNING)


@click.group()
@click.option(
    "--debug/--no-debug",
    default=False,
    envvar="MYTHX_DEBUG",
    help="Provide additional debug output",
)
@click.option(
    "--access-token",
    envvar="MYTHX_ACCESS_TOKEN",
    help="Your access token generated from the MythX dashboard",
)
@click.option(
    "--eth-address",
    envvar="MYTHX_ETH_ADDRESS",
    help="Your MythX account's Ethereum address",
)
@click.option(
    "--password",
    envvar="MYTHX_PASSWORD",
    help="Your MythX account's password as set in the dashboard"
)
@click.option(
    "--staging/--production",
    default=False,
    hidden=True,
    envvar="MYTHX_STAGING",
    help="Use the MythX staging environment",
)
@click.option(
    "--format",
    "fmt",
    default="simple",
    type=click.Choice(FORMAT_RESOLVER.keys()),
    help="The format to display the results in",
)
@click.pass_context
def cli(ctx, **kwargs):
    """Your CLI for interacting with https://mythx.io/
    \f

    :param ctx: Click context holding group-level parameters
    :param debug: Boolean to enable the `logging` debug mode
    :param access_token: User JWT access token from the MythX dashboard
    :param eth_address: The MythX account ETH address/username
    :param password: The account password from the MythX dashboard
    :param staging: Boolean to redirect requests to MythX staging
    :param fmt: The formatter to use for the subcommand output
    """

    ctx.obj = dict(kwargs)
    toolname_mw = ClientToolNameMiddleware(name="mythx-cli-{}".format(__version__))
    if kwargs["access_token"] is not None:
        ctx.obj["client"] = Client(
            access_token=kwargs["access_token"],
            staging=kwargs["staging"],
            middlewares=[toolname_mw],
        )
    elif kwargs["eth_address"] and kwargs["password"]:
        ctx.obj["client"] = Client(
            eth_address=kwargs["eth_address"],
            password=kwargs["password"],
            staging=kwargs["staging"],
            middlewares=[toolname_mw]
        )
    else:
        # default to trial user client
        ctx.obj["client"] = Client(
            eth_address="0x0000000000000000000000000000000000000000",
            password="trial",
            staging=kwargs["staging"],
            middlewares=[toolname_mw],
        )
    if kwargs["debug"]:
        for name in logging.root.manager.loggerDict:
            logging.getLogger(name).setLevel(logging.DEBUG)

    return 0


[docs]def find_truffle_artifacts(project_dir): """Look for a Truffle build folder and return all relevant JSON artifacts. This function will skip the Migrations.json file and return all other files under :code:`<project-dir>/build/contracts/`. If no files were found, :code:`None` is returned. :param project_dir: The base directory of the Truffle project :return: Files under :code:`<project-dir>/build/contracts/` or :code:`None` """ output_pattern = Path(project_dir) / "build" / "contracts" / "*.json" artifact_files = list(glob(str(output_pattern.absolute()))) if not artifact_files: return None return [f for f in artifact_files if not f.endswith("Migrations.json")]
[docs]def find_solidity_files(project_dir): """Return all Solidity files in the given directory. This will match all files with the `.sol` extension. :param project_dir: The directory to search in :return: Solidity files in `project_dir` or `None` """ output_pattern = Path(project_dir) / "*.sol" artifact_files = list(glob(str(output_pattern.absolute()))) if not artifact_files: return None return artifact_files
@cli.command() @click.argument( "target", default=None, nargs=-1, required=False # allow multiple targets ) @click.option( "--async/--wait", # TODO: make default on full "async_flag", help="Submit the job and print the UUID, or wait for execution to finish", ) @click.option("--mode", type=click.Choice(["quick", "full"]), default="quick") @click.pass_obj def analyze(ctx, target, async_flag, mode): """Analyze the given directory or arguments with MythX. \f :param ctx: Click context holding group-level parameters :param target: Arguments passed to the `analyze` subcommand :param async_flag: Whether to execute the analysis asynchronously :param mode: Full or quick analysis mode :return: """ jobs = [] if not target: if Path("truffle-config.js").exists() or Path("truffle.js").exists(): files = find_truffle_artifacts(Path.cwd()) if not files: raise click.exceptions.UsageError( "Could not find any truffle artifacts. Are you in the project root? Did you run truffle compile?" ) LOGGER.debug( "Detected Truffle project with files:\n{}".format("\n".join(files)) ) for file in files: jobs.append(generate_truffle_payload(file)) elif list(glob("*.sol")): files = find_solidity_files(Path.cwd()) click.confirm( "Do you really want to submit {} Solidity files?".format(len(files)) ) LOGGER.debug("Found Solidity files to submit:\n{}".format("\n".join(files))) for file in files: jobs.append(generate_solidity_payload(file)) else: raise click.exceptions.UsageError( "No argument given and unable to detect Truffle project or Solidity files" ) else: for target_elem in target: if target_elem.startswith("0x"): LOGGER.debug("Identified target {} as bytecode".format(target_elem)) jobs.append(generate_bytecode_payload(target_elem)) continue elif Path(target_elem).is_file() and Path(target_elem).suffix == ".sol": LOGGER.debug( "Trying to interpret {} as a solidity file".format(target_elem) ) jobs.append(generate_solidity_payload(target_elem)) continue else: raise click.exceptions.UsageError( "Could not interpret argument {} as bytecode or Solidity file".format( target_elem ) ) uuids = [] with click.progressbar(jobs) as bar: for job in bar: # attach execution mode, submit, poll job.update({"analysis_mode": mode}) resp = ctx["client"].analyze(**job) uuids.append(resp.uuid) if async_flag: click.echo("\n".join(uuids)) return for uuid in uuids: while not ctx["client"].analysis_ready(uuid): # TODO: Add poll interval option time.sleep(3) resp = ctx["client"].report(uuid) inp = ctx["client"].request_by_uuid(uuid) ctx["uuid"] = uuid click.echo(FORMAT_RESOLVER[ctx["fmt"]].format_detected_issues(resp, inp)) @cli.command() @click.argument("uuids", default=None, nargs=-1) @click.pass_obj def status(ctx, uuids): """Get the status of an already submitted analysis. \f :param ctx: Click context holding group-level parameters :param uuids: A list of job UUIDs to fetch the status for """ for uuid in uuids: resp = ctx["client"].status(uuid) click.echo(FORMAT_RESOLVER[ctx["fmt"]].format_analysis_status(resp)) @cli.command(name="list") @click.option( "--number", default=5, type=click.IntRange(min=1, max=100), # ~ 5 requests à 20 entries help="The number of most recent analysis jobs to display", ) @click.pass_obj def list_(ctx, number): """Get a list of submitted analyses. \f :param ctx: Click context holding group-level parameters :param number: The number of analysis jobs to display :return: """ result = AnalysisListResponse(analyses=[], total=0) try: offset = 0 while True: resp = ctx["client"].analysis_list(offset=offset) offset += len(resp.analyses) result.analyses.extend(resp.analyses) if len(result.analyses) >= number: break # trim result to desired result number LOGGER.debug(resp.total) result = AnalysisListResponse(analyses=result[:number], total=resp.total) except MythXAPIError: raise click.UsageError( ( "This functionality is only available to registered users. " "Head over to https://mythx.io/ and register a free account to " "list your past analyses. Alternatively, you can look up the " "status of a specific job by calling 'mythx status <uuid>'." ) ) click.echo(FORMAT_RESOLVER[ctx["fmt"]].format_analysis_list(result)) @cli.command() @click.argument("uuids", default=None, nargs=-1) @click.pass_obj def report(ctx, uuids): """Fetch the report for a single or multiple job UUIDs. \f :param ctx: Click context holding group-level parameters :param uuids: List of UUIDs to display the report for :return: """ for uuid in uuids: resp = ctx["client"].report(uuid) inp = ctx["client"].request_by_uuid(uuid) ctx["uuid"] = uuid click.echo(FORMAT_RESOLVER[ctx["fmt"]].format_detected_issues(resp, inp)) @cli.command() @click.pass_obj def version(ctx): """Display API version information. \f :param ctx: Click context holding group-level parameters :return: """ resp = ctx["client"].version() click.echo(FORMAT_RESOLVER[ctx["fmt"]].format_version(resp)) if __name__ == "__main__": sys.exit(cli()) # pragma: no cover