Skip to content

API Documentation

Shadowserver API abstraction library.

API documentation is available at: https://www.shadowserver.org/what-we-do/network-reporting/api-documentation/

ShadowServerAPI

Source code in shadowserver_api/api.py
class ShadowServerAPI:
  def __init__(self, api_url: str, api_key: str, api_secret: str, timeout: int = 45) -> None:
    self.api_url = api_url
    self.api_key = api_key
    self.api_secret_bytes = api_secret.encode()
    self.session = httpx.Client(timeout=timeout)
    self.logger = logging.getLogger(__name__)

  def _generate_hmac(self, request_bytes: bytes) -> str:
    hmac_generator = hmac.new(self.api_secret_bytes, request_bytes, hashlib.sha256)
    return hmac_generator.hexdigest()

  def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
    """Call the specified api method with a request dictionary."""
    url = f'{self.api_url}{method}'

    self.logger.debug('URL: %s', url)
    self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))

    request_data['apikey'] = self.api_key
    request_bytes = json.dumps(request_data).encode()
    request_hmac = self._generate_hmac(request_bytes)

    response = self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})

    if response.status_code != 200:
      try:
        response_json = response.json()
        raise InvalidRequest(response_json['error'])
      except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
        if 'No supported report filters found' in response.text:
          raise NoSupportedReportFilter(response.text) from exc

        raise InvalidRequest(response.text) from exc

    try:
      return response.json()
    except ValueError as exc:
      raise InvalidResponse(response.text) from exc

  def api_test_ping(self) -> dict[str, typing.Any]:
    """Check your connection to the API server."""
    result = self.api_call('test/ping', {})

    if not isinstance(result, dict):
      raise ValueError('Invalid return value from api_call')

    return result

  def api_key_info(self) -> dict[str, typing.Any]:
    """Returns details about your apikey."""
    result = self.api_call('key/info', {})

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    if len(result) != 1:
      raise ValueError('Invalid return value from api_call')

    return result[0]

  def api_reports_subscribed(self) -> list[str]:
    """List of reports that the user is subscribed to."""
    result = self.api_call('reports/subscribed', {})

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  def api_reports_types(self) -> list[str]:
    """List of all the types of reports that are available for the subscriber."""
    result = self.api_call('reports/types', {})

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  def api_reports_list(
    self,
    reports: list[str] | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    report_type: str | None = None,
    limit: int | None = None,
  ) -> list[dict[str, typing.Any]]:
    """List of actual reports that could be downloaded."""
    request_data: dict[str, typing.Any] = {}

    if reports:
      request_data['reports'] = reports

    if limit:
      request_data['limit'] = limit

    if start_date:
      if not regex_date.match(start_date):
        raise ValueError('Invalid start_date format.')

      if end_date:
        if not regex_date.match(end_date):
          raise ValueError('Invalid end_date format.')

        request_data['date'] = f'{start_date}:{end_date}'
      else:
        request_data['date'] = f'{start_date}'

    if report_type:
      request_data['type'] = report_type

    result = self.api_call('reports/list', request_data)

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
    """Download specific report."""
    download_url = f'https://dl.shadowserver.org/{report_id}'
    res = self.session.get(download_url)

    raw = io.StringIO(res.text)
    csv_reader = csv.DictReader(raw)
    data = list(csv_reader)

    return data

  def api_reports_query(
    self,
    query: dict[str, str],
    sort: str = 'ascending',
    start_date: str | None = None,
    end_date: str | None = None,
    facet: str | None = None,
    limit: int = 1000,
    pagination: bool = True,
  ) -> typing.Iterator[dict[str, typing.Any]]:
    """Do a reports query.

    Args:
      query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
      sort: Sort the output. Valid values are "ascending", "descending".
      start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
      end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
      facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
      limit: Number of records to pull.
      pagination: If set to True, automatically pulls all results using pagination.
    """
    if sort not in ('ascending', 'descending'):
      raise ValueError('"sort" parameter must be one of: "ascending", "descending"')

    request_data = {
      'query': query,
      'sort': sort,
      'page': '1',
    }

    if start_date is not None:
      if not regex_date.match(start_date):
        raise ValueError('Invalid start_date format.')

      if end_date is not None:
        if not regex_date.match(end_date):
          raise ValueError('Invalid end_date format.')

        request_data['date'] = f'{start_date}:{end_date}'
      else:
        request_data['date'] = f'{start_date}'

    if facet:
      request_data['facet'] = facet

    if limit:
      request_data['limit'] = str(limit)

    result = self.api_call('reports/query', request_data)

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    for res in result:
      yield res

    # only do automatic pagination if page parameter was set
    if pagination:
      _page = 1

      while result:
        _page += 1
        request_data['page'] = str(_page)
        result = self.api_call('reports/query', request_data)

        if not isinstance(result, list):
          raise ValueError('Invalid return value from api_call')

        for res in result:
          yield res

  def api_reports_stats(
    self,
    start_date: str | None = None,
    end_date: str | None = None,
    report: typing.Sequence[str] | None = None,
    report_type: typing.Sequence[str] | None = None,
  ) -> list[tuple[str, str, str]]:
    """Fetch report statistics.

    Args:
      start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
      end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
      report: Filter by report name.
      report_type: Filter by report type.
    """
    request_data: dict[str, typing.Any] = {}

    if report:
      request_data['report'] = report

    if start_date is not None:
      if not regex_date.match(start_date):
        raise ValueError('Invalid start_date format.')

      if end_date is not None:
        if not regex_date.match(end_date):
          raise ValueError('Invalid end_date format.')

        request_data['date'] = f'{start_date}:{end_date}'
      else:
        request_data['date'] = f'{start_date}'

    if report_type:
      request_data['type'] = report_type

    result = self.api_call('reports/stats', request_data)

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  def api_reports_device_info(
    self,
    query: dict[str, str],
  ) -> dict[str, typing.Any]:
    """Do a reports device-info query, which provides device details for a given IP.

    Args:
      query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
    """
    for k in query.keys():
      if k not in ('ip', 'asn', 'geo'):
        raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')

    request_data = {'query': query}

    result = self.api_call('reports/device-info', request_data)

    return result

  def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
    """Return a schema for a given report type.

    Args:
      report_type: Report type to get the schema for.
    """
    request_data = {'type': report_type}

    result = self.api_call('reports/schema', request_data)

    return result

api_call(method, request_data)

Call the specified api method with a request dictionary.

Source code in shadowserver_api/api.py
def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
  """Call the specified api method with a request dictionary."""
  url = f'{self.api_url}{method}'

  self.logger.debug('URL: %s', url)
  self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))

  request_data['apikey'] = self.api_key
  request_bytes = json.dumps(request_data).encode()
  request_hmac = self._generate_hmac(request_bytes)

  response = self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})

  if response.status_code != 200:
    try:
      response_json = response.json()
      raise InvalidRequest(response_json['error'])
    except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
      if 'No supported report filters found' in response.text:
        raise NoSupportedReportFilter(response.text) from exc

      raise InvalidRequest(response.text) from exc

  try:
    return response.json()
  except ValueError as exc:
    raise InvalidResponse(response.text) from exc

api_key_info()

Returns details about your apikey.

Source code in shadowserver_api/api.py
def api_key_info(self) -> dict[str, typing.Any]:
  """Returns details about your apikey."""
  result = self.api_call('key/info', {})

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  if len(result) != 1:
    raise ValueError('Invalid return value from api_call')

  return result[0]

api_reports_device_info(query)

Do a reports device-info query, which provides device details for a given IP.

Parameters:

Name Type Description Default
query dict[str, str]

Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.

required
Source code in shadowserver_api/api.py
def api_reports_device_info(
  self,
  query: dict[str, str],
) -> dict[str, typing.Any]:
  """Do a reports device-info query, which provides device details for a given IP.

  Args:
    query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
  """
  for k in query.keys():
    if k not in ('ip', 'asn', 'geo'):
      raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')

  request_data = {'query': query}

  result = self.api_call('reports/device-info', request_data)

  return result

api_reports_download(report_id)

Download specific report.

Source code in shadowserver_api/api.py
def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
  """Download specific report."""
  download_url = f'https://dl.shadowserver.org/{report_id}'
  res = self.session.get(download_url)

  raw = io.StringIO(res.text)
  csv_reader = csv.DictReader(raw)
  data = list(csv_reader)

  return data

api_reports_list(reports=None, start_date=None, end_date=None, report_type=None, limit=None)

List of actual reports that could be downloaded.

Source code in shadowserver_api/api.py
def api_reports_list(
  self,
  reports: list[str] | None = None,
  start_date: str | None = None,
  end_date: str | None = None,
  report_type: str | None = None,
  limit: int | None = None,
) -> list[dict[str, typing.Any]]:
  """List of actual reports that could be downloaded."""
  request_data: dict[str, typing.Any] = {}

  if reports:
    request_data['reports'] = reports

  if limit:
    request_data['limit'] = limit

  if start_date:
    if not regex_date.match(start_date):
      raise ValueError('Invalid start_date format.')

    if end_date:
      if not regex_date.match(end_date):
        raise ValueError('Invalid end_date format.')

      request_data['date'] = f'{start_date}:{end_date}'
    else:
      request_data['date'] = f'{start_date}'

  if report_type:
    request_data['type'] = report_type

  result = self.api_call('reports/list', request_data)

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_reports_query(query, sort='ascending', start_date=None, end_date=None, facet=None, limit=1000, pagination=True)

Do a reports query.

Parameters:

Name Type Description Default
query dict[str, str]

The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.

required
sort str

Sort the output. Valid values are "ascending", "descending".

'ascending'
start_date str | None

Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".

None
end_date str | None

Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".

None
facet str | None

Returns the cardinality of each value of the given field sorted from highest to lowest.

None
limit int

Number of records to pull.

1000
pagination bool

If set to True, automatically pulls all results using pagination.

True
Source code in shadowserver_api/api.py
def api_reports_query(
  self,
  query: dict[str, str],
  sort: str = 'ascending',
  start_date: str | None = None,
  end_date: str | None = None,
  facet: str | None = None,
  limit: int = 1000,
  pagination: bool = True,
) -> typing.Iterator[dict[str, typing.Any]]:
  """Do a reports query.

  Args:
    query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
    sort: Sort the output. Valid values are "ascending", "descending".
    start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
    end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
    facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
    limit: Number of records to pull.
    pagination: If set to True, automatically pulls all results using pagination.
  """
  if sort not in ('ascending', 'descending'):
    raise ValueError('"sort" parameter must be one of: "ascending", "descending"')

  request_data = {
    'query': query,
    'sort': sort,
    'page': '1',
  }

  if start_date is not None:
    if not regex_date.match(start_date):
      raise ValueError('Invalid start_date format.')

    if end_date is not None:
      if not regex_date.match(end_date):
        raise ValueError('Invalid end_date format.')

      request_data['date'] = f'{start_date}:{end_date}'
    else:
      request_data['date'] = f'{start_date}'

  if facet:
    request_data['facet'] = facet

  if limit:
    request_data['limit'] = str(limit)

  result = self.api_call('reports/query', request_data)

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  for res in result:
    yield res

  # only do automatic pagination if page parameter was set
  if pagination:
    _page = 1

    while result:
      _page += 1
      request_data['page'] = str(_page)
      result = self.api_call('reports/query', request_data)

      if not isinstance(result, list):
        raise ValueError('Invalid return value from api_call')

      for res in result:
        yield res

api_reports_schema(report_type)

Return a schema for a given report type.

Parameters:

Name Type Description Default
report_type str

Report type to get the schema for.

required
Source code in shadowserver_api/api.py
def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
  """Return a schema for a given report type.

  Args:
    report_type: Report type to get the schema for.
  """
  request_data = {'type': report_type}

  result = self.api_call('reports/schema', request_data)

  return result

api_reports_stats(start_date=None, end_date=None, report=None, report_type=None)

Fetch report statistics.

Parameters:

Name Type Description Default
start_date str | None

Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".

None
end_date str | None

Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".

None
report Sequence[str] | None

Filter by report name.

None
report_type Sequence[str] | None

Filter by report type.

None
Source code in shadowserver_api/api.py
def api_reports_stats(
  self,
  start_date: str | None = None,
  end_date: str | None = None,
  report: typing.Sequence[str] | None = None,
  report_type: typing.Sequence[str] | None = None,
) -> list[tuple[str, str, str]]:
  """Fetch report statistics.

  Args:
    start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
    end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
    report: Filter by report name.
    report_type: Filter by report type.
  """
  request_data: dict[str, typing.Any] = {}

  if report:
    request_data['report'] = report

  if start_date is not None:
    if not regex_date.match(start_date):
      raise ValueError('Invalid start_date format.')

    if end_date is not None:
      if not regex_date.match(end_date):
        raise ValueError('Invalid end_date format.')

      request_data['date'] = f'{start_date}:{end_date}'
    else:
      request_data['date'] = f'{start_date}'

  if report_type:
    request_data['type'] = report_type

  result = self.api_call('reports/stats', request_data)

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_reports_subscribed()

List of reports that the user is subscribed to.

Source code in shadowserver_api/api.py
def api_reports_subscribed(self) -> list[str]:
  """List of reports that the user is subscribed to."""
  result = self.api_call('reports/subscribed', {})

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_reports_types()

List of all the types of reports that are available for the subscriber.

Source code in shadowserver_api/api.py
def api_reports_types(self) -> list[str]:
  """List of all the types of reports that are available for the subscriber."""
  result = self.api_call('reports/types', {})

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_test_ping()

Check your connection to the API server.

Source code in shadowserver_api/api.py
def api_test_ping(self) -> dict[str, typing.Any]:
  """Check your connection to the API server."""
  result = self.api_call('test/ping', {})

  if not isinstance(result, dict):
    raise ValueError('Invalid return value from api_call')

  return result

Shadowserver API abstraction library.

API documentation is available at: https://www.shadowserver.org/what-we-do/network-reporting/api-documentation/

AsyncShadowServerAPI

Source code in shadowserver_api/async_api.py
class AsyncShadowServerAPI:
  def __init__(self, api_url: str, api_key: str, api_secret: str, timeout: int = 45) -> None:
    self.api_url = api_url
    self.api_key = api_key
    self.api_secret_bytes = api_secret.encode()
    self.session = httpx.AsyncClient(timeout=timeout)
    self.logger = logging.getLogger(__name__)

  def _generate_hmac(self, request_bytes: bytes) -> str:
    hmac_generator = hmac.new(self.api_secret_bytes, request_bytes, hashlib.sha256)
    return hmac_generator.hexdigest()

  async def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
    """Call the specified api method with a request dictionary."""
    url = f'{self.api_url}{method}'

    self.logger.debug('URL: %s', url)
    self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))

    request_data['apikey'] = self.api_key
    request_bytes = json.dumps(request_data).encode()
    request_hmac = self._generate_hmac(request_bytes)

    response = await self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})

    if response.status_code != 200:
      try:
        response_json = response.json()
        raise InvalidRequest(response_json['error'])
      except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
        if 'No supported report filters found' in response.text:
          raise NoSupportedReportFilter(response.text) from exc

        raise InvalidRequest(response.text) from exc

    try:
      return response.json()
    except ValueError as exc:
      raise InvalidResponse(response.text) from exc

  async def api_test_ping(self) -> dict[str, typing.Any]:
    """Check your connection to the API server."""
    result = await self.api_call('test/ping', {})

    if not isinstance(result, dict):
      raise ValueError('Invalid return value from api_call')

    return result

  async def api_key_info(self) -> dict[str, typing.Any]:
    """Returns details about your apikey."""
    result = await self.api_call('key/info', {})

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    if len(result) != 1:
      raise ValueError('Invalid return value from api_call')

    return result[0]

  async def api_reports_subscribed(self) -> list[str]:
    """List of reports that the user is subscribed to."""
    result = await self.api_call('reports/subscribed', {})

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  async def api_reports_types(self) -> list[str]:
    """List of all the types of reports that are available for the subscriber."""
    result = await self.api_call('reports/types', {})

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  async def api_reports_list(
    self,
    reports: list[str] | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    report_type: str | None = None,
    limit: int | None = None,
  ) -> list[dict[str, typing.Any]]:
    """List of actual reports that could be downloaded."""
    request_data: dict[str, typing.Any] = {}

    if reports:
      request_data['reports'] = reports

    if limit:
      request_data['limit'] = limit

    if start_date:
      if not regex_date.match(start_date):
        raise ValueError('Invalid start_date format.')

      if end_date:
        if not regex_date.match(end_date):
          raise ValueError('Invalid end_date format.')

        request_data['date'] = f'{start_date}:{end_date}'
      else:
        request_data['date'] = f'{start_date}'

    if report_type:
      request_data['type'] = report_type

    result = await self.api_call('reports/list', request_data)

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  async def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
    """Download specific report."""
    download_url = f'https://dl.shadowserver.org/{report_id}'
    res = await self.session.get(download_url)

    raw = io.StringIO(res.text)
    csv_reader = csv.DictReader(raw)
    data = list(csv_reader)

    return data

  async def api_reports_query(
    self,
    query: dict[str, str],
    sort: str = 'ascending',
    start_date: str | None = None,
    end_date: str | None = None,
    facet: str | None = None,
    limit: int = 1000,
    pagination: bool = True,
  ) -> typing.AsyncIterator[dict[str, typing.Any]]:
    """Do a reports query.

    Args:
      query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
      sort: Sort the output. Valid values are "ascending", "descending".
      start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
      end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
      facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
      limit: Number of records to pull.
      pagination: If set to True, automatically pulls all results using pagination.
    """
    if sort not in ('ascending', 'descending'):
      raise ValueError('"sort" parameter must be one of: "ascending", "descending"')

    request_data = {
      'query': query,
      'sort': sort,
      'page': '1',
    }

    if start_date is not None:
      if not regex_date.match(start_date):
        raise ValueError('Invalid start_date format.')

      if end_date is not None:
        if not regex_date.match(end_date):
          raise ValueError('Invalid end_date format.')

        request_data['date'] = f'{start_date}:{end_date}'
      else:
        request_data['date'] = f'{start_date}'

    if facet:
      request_data['facet'] = facet

    if limit:
      request_data['limit'] = str(limit)

    result = await self.api_call('reports/query', request_data)

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    for res in result:
      yield res

    # only do automatic pagination if page parameter was set
    if pagination:
      _page = 1

      while result:
        _page += 1
        request_data['page'] = str(_page)
        result = self.api_call('reports/query', request_data)

        if not isinstance(result, list):
          raise ValueError('Invalid return value from api_call')

        for res in result:
          yield res

  async def api_reports_stats(
    self,
    start_date: str | None = None,
    end_date: str | None = None,
    report: typing.Sequence[str] | None = None,
    report_type: typing.Sequence[str] | None = None,
  ) -> list[tuple[str, str, str]]:
    """Fetch report statistics.

    Args:
      start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
      end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
      report: Filter by report name.
      report_type: Filter by report type.
    """
    request_data: dict[str, typing.Any] = {}

    if report:
      request_data['report'] = report

    if start_date is not None:
      if not regex_date.match(start_date):
        raise ValueError('Invalid start_date format.')

      if end_date is not None:
        if not regex_date.match(end_date):
          raise ValueError('Invalid end_date format.')

        request_data['date'] = f'{start_date}:{end_date}'
      else:
        request_data['date'] = f'{start_date}'

    if report_type:
      request_data['type'] = report_type

    result = await self.api_call('reports/stats', request_data)

    if not isinstance(result, list):
      raise ValueError('Invalid return value from api_call')

    return result

  async def api_reports_device_info(
    self,
    query: dict[str, str],
  ) -> dict[str, typing.Any]:
    """Do a reports device-info query, which provides device details for a given IP.

    Args:
      query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
    """
    for k in query.keys():
      if k not in ('ip', 'asn', 'geo'):
        raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')

    request_data = {'query': query}

    result = await self.api_call('reports/device-info', request_data)

    return result

  async def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
    """Return a schema for a given report type.

    Args:
      report_type: Report type to get the schema for.
    """
    request_data = {'type': report_type}

    result = await self.api_call('reports/schema', request_data)

    return result

api_call(method, request_data) async

Call the specified api method with a request dictionary.

Source code in shadowserver_api/async_api.py
async def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
  """Call the specified api method with a request dictionary."""
  url = f'{self.api_url}{method}'

  self.logger.debug('URL: %s', url)
  self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))

  request_data['apikey'] = self.api_key
  request_bytes = json.dumps(request_data).encode()
  request_hmac = self._generate_hmac(request_bytes)

  response = await self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})

  if response.status_code != 200:
    try:
      response_json = response.json()
      raise InvalidRequest(response_json['error'])
    except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
      if 'No supported report filters found' in response.text:
        raise NoSupportedReportFilter(response.text) from exc

      raise InvalidRequest(response.text) from exc

  try:
    return response.json()
  except ValueError as exc:
    raise InvalidResponse(response.text) from exc

api_key_info() async

Returns details about your apikey.

Source code in shadowserver_api/async_api.py
async def api_key_info(self) -> dict[str, typing.Any]:
  """Returns details about your apikey."""
  result = await self.api_call('key/info', {})

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  if len(result) != 1:
    raise ValueError('Invalid return value from api_call')

  return result[0]

api_reports_device_info(query) async

Do a reports device-info query, which provides device details for a given IP.

Parameters:

Name Type Description Default
query dict[str, str]

Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.

required
Source code in shadowserver_api/async_api.py
async def api_reports_device_info(
  self,
  query: dict[str, str],
) -> dict[str, typing.Any]:
  """Do a reports device-info query, which provides device details for a given IP.

  Args:
    query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
  """
  for k in query.keys():
    if k not in ('ip', 'asn', 'geo'):
      raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')

  request_data = {'query': query}

  result = await self.api_call('reports/device-info', request_data)

  return result

api_reports_download(report_id) async

Download specific report.

Source code in shadowserver_api/async_api.py
async def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
  """Download specific report."""
  download_url = f'https://dl.shadowserver.org/{report_id}'
  res = await self.session.get(download_url)

  raw = io.StringIO(res.text)
  csv_reader = csv.DictReader(raw)
  data = list(csv_reader)

  return data

api_reports_list(reports=None, start_date=None, end_date=None, report_type=None, limit=None) async

List of actual reports that could be downloaded.

Source code in shadowserver_api/async_api.py
async def api_reports_list(
  self,
  reports: list[str] | None = None,
  start_date: str | None = None,
  end_date: str | None = None,
  report_type: str | None = None,
  limit: int | None = None,
) -> list[dict[str, typing.Any]]:
  """List of actual reports that could be downloaded."""
  request_data: dict[str, typing.Any] = {}

  if reports:
    request_data['reports'] = reports

  if limit:
    request_data['limit'] = limit

  if start_date:
    if not regex_date.match(start_date):
      raise ValueError('Invalid start_date format.')

    if end_date:
      if not regex_date.match(end_date):
        raise ValueError('Invalid end_date format.')

      request_data['date'] = f'{start_date}:{end_date}'
    else:
      request_data['date'] = f'{start_date}'

  if report_type:
    request_data['type'] = report_type

  result = await self.api_call('reports/list', request_data)

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_reports_query(query, sort='ascending', start_date=None, end_date=None, facet=None, limit=1000, pagination=True) async

Do a reports query.

Parameters:

Name Type Description Default
query dict[str, str]

The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.

required
sort str

Sort the output. Valid values are "ascending", "descending".

'ascending'
start_date str | None

Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".

None
end_date str | None

Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".

None
facet str | None

Returns the cardinality of each value of the given field sorted from highest to lowest.

None
limit int

Number of records to pull.

1000
pagination bool

If set to True, automatically pulls all results using pagination.

True
Source code in shadowserver_api/async_api.py
async def api_reports_query(
  self,
  query: dict[str, str],
  sort: str = 'ascending',
  start_date: str | None = None,
  end_date: str | None = None,
  facet: str | None = None,
  limit: int = 1000,
  pagination: bool = True,
) -> typing.AsyncIterator[dict[str, typing.Any]]:
  """Do a reports query.

  Args:
    query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
    sort: Sort the output. Valid values are "ascending", "descending".
    start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
    end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
    facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
    limit: Number of records to pull.
    pagination: If set to True, automatically pulls all results using pagination.
  """
  if sort not in ('ascending', 'descending'):
    raise ValueError('"sort" parameter must be one of: "ascending", "descending"')

  request_data = {
    'query': query,
    'sort': sort,
    'page': '1',
  }

  if start_date is not None:
    if not regex_date.match(start_date):
      raise ValueError('Invalid start_date format.')

    if end_date is not None:
      if not regex_date.match(end_date):
        raise ValueError('Invalid end_date format.')

      request_data['date'] = f'{start_date}:{end_date}'
    else:
      request_data['date'] = f'{start_date}'

  if facet:
    request_data['facet'] = facet

  if limit:
    request_data['limit'] = str(limit)

  result = await self.api_call('reports/query', request_data)

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  for res in result:
    yield res

  # only do automatic pagination if page parameter was set
  if pagination:
    _page = 1

    while result:
      _page += 1
      request_data['page'] = str(_page)
      result = self.api_call('reports/query', request_data)

      if not isinstance(result, list):
        raise ValueError('Invalid return value from api_call')

      for res in result:
        yield res

api_reports_schema(report_type) async

Return a schema for a given report type.

Parameters:

Name Type Description Default
report_type str

Report type to get the schema for.

required
Source code in shadowserver_api/async_api.py
async def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
  """Return a schema for a given report type.

  Args:
    report_type: Report type to get the schema for.
  """
  request_data = {'type': report_type}

  result = await self.api_call('reports/schema', request_data)

  return result

api_reports_stats(start_date=None, end_date=None, report=None, report_type=None) async

Fetch report statistics.

Parameters:

Name Type Description Default
start_date str | None

Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".

None
end_date str | None

Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".

None
report Sequence[str] | None

Filter by report name.

None
report_type Sequence[str] | None

Filter by report type.

None
Source code in shadowserver_api/async_api.py
async def api_reports_stats(
  self,
  start_date: str | None = None,
  end_date: str | None = None,
  report: typing.Sequence[str] | None = None,
  report_type: typing.Sequence[str] | None = None,
) -> list[tuple[str, str, str]]:
  """Fetch report statistics.

  Args:
    start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
    end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
    report: Filter by report name.
    report_type: Filter by report type.
  """
  request_data: dict[str, typing.Any] = {}

  if report:
    request_data['report'] = report

  if start_date is not None:
    if not regex_date.match(start_date):
      raise ValueError('Invalid start_date format.')

    if end_date is not None:
      if not regex_date.match(end_date):
        raise ValueError('Invalid end_date format.')

      request_data['date'] = f'{start_date}:{end_date}'
    else:
      request_data['date'] = f'{start_date}'

  if report_type:
    request_data['type'] = report_type

  result = await self.api_call('reports/stats', request_data)

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_reports_subscribed() async

List of reports that the user is subscribed to.

Source code in shadowserver_api/async_api.py
async def api_reports_subscribed(self) -> list[str]:
  """List of reports that the user is subscribed to."""
  result = await self.api_call('reports/subscribed', {})

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_reports_types() async

List of all the types of reports that are available for the subscriber.

Source code in shadowserver_api/async_api.py
async def api_reports_types(self) -> list[str]:
  """List of all the types of reports that are available for the subscriber."""
  result = await self.api_call('reports/types', {})

  if not isinstance(result, list):
    raise ValueError('Invalid return value from api_call')

  return result

api_test_ping() async

Check your connection to the API server.

Source code in shadowserver_api/async_api.py
async def api_test_ping(self) -> dict[str, typing.Any]:
  """Check your connection to the API server."""
  result = await self.api_call('test/ping', {})

  if not isinstance(result, dict):
    raise ValueError('Invalid return value from api_call')

  return result