Module pinkfish.fetch

Fetch time series data.

Expand source code
Fetch time series data.

import datetime
import os
import sys
import warnings

import pandas as pd
from pandas_datareader._utils import RemoteDataError
import as pdr
import yfinance as yf

from pinkfish.pfstatistics import (
from pinkfish.stock_market_calendar import (
import pinkfish.utility as utility

# Override pandas_datareader with yfinance

# TIMESERIES (fetch, select, finalize)

def _get_cache_dir(dir_name):
    Get the data dir path.

    dir_name : str
        The leaf data dir name.

        Path to the data dir.
    base_dir = ''
        conf = utility.read_config()
        base_dir = conf['base_dir']
    except Exception as e:
        dir_name = os.path.join(base_dir, dir_name)

    if not os.path.exists(dir_name):
    return dir_name

def _adj_column_names(ts):
    Make all column names lower case.

    ta-lib expects columns to be lower case. To be consistent,
    make date index lowercase also.  Replace spaces with underscores.

    ts : pd.DataFrame
        The timeseries of a symbol.

        The timeseries with adjusted column names.
    ts.columns = [col.lower().replace(' ','_') for col in ts.columns]
    ts.index.names = ['date']
    return ts

def fetch_timeseries(symbol, dir_name='data', use_cache=True, from_year=None):
    Read time series data.

    Use cached version if it exists and use_cache is True, otherwise
    retrive, cache, then read.

    symbol : str
        The symbol for a security.
    dir_name : str, optional
        The leaf data dir name (default is 'data').
    use_cache: bool, optional
        True to use data cache.  False to retrieve from the internet 
        (default is True).
    from_year: int, optional
        The start year for timeseries retrieval (default is None,
        which implies that all the available data is retrieved).

        The timeseries of a symbol.
    if from_year is None:
        from_year = 1900 if not sys.platform.startswith('win') else 1971

    # Yahoo finance uses '-' where '.' is used in symbol names.
    symbol = symbol.replace('.', '-')
    symbol = symbol.upper()

    # pinkfish allows the use of a suffix starting with a '_',
    # like SPY_SHRT, so extract the symbol.
    symbol = symbol.split('_')[0]

    timeseries_cache = os.path.join(_get_cache_dir(dir_name), symbol + '.csv')

    if os.path.isfile(timeseries_cache) and use_cache:
            ts = pdr.get_data_yahoo(symbol, start=datetime.datetime(from_year, 1, 1), progress=False)
        except RemoteDataError as e:
        except Exception as e:
            ts.to_csv(timeseries_cache, encoding='utf-8')

    ts = pd.read_csv(timeseries_cache, index_col='Date', parse_dates=True)
    ts = _adj_column_names(ts)

    # Remove rows that have duplicated index.
    ts = ts[~ts.index.duplicated(keep='first')]
    return ts

def _adj_prices(ts):
    Back adjust prices relative to adj_close for dividends and splits.

    ts : pd.DataFrame
        The timeseries of a symbol.

        The timeseries with adjusted prices.
    ts['open'] = ts['open'] * ts['adj_close'] / ts['close']
    ts['high'] = ts['high'] * ts['adj_close'] / ts['close']
    ts['low'] = ts['low'] * ts['adj_close'] / ts['close']
    ts['close'] = ts['close'] * ts['adj_close'] / ts['close']
    return ts

def select_tradeperiod(ts, start, end, use_adj=False,
    Select the trade period.

    First, remove rows that have zero values in price columns. Then,
    select a time slice of the data to trade from ts.  Back date a year
    to allow time for long term indicators, e.g. 200sma is become valid.

    ts : pd.DataFrame
        The timeseries of a symbol.
    start : datetime.datetime
        The desired start date for the strategy.
    end : datetime.datetime
        The desired end date for the strategy.
    use_adj : bool, optional
        True to adjust prices for dividends and splits
        (default is False).
    use_continuous_calendar: bool, optional
        True if your timeseries has data for all seven days a week,
        and you want to backtest trading every day, including weekends.
        If this value is True, then `force_stock_market_calendar`
        is set to False (default is False).
    force_stock_market_calendar : bool, optional
        True forces use of stock market calendar on timeseries.
        Normally, you don't need to do this.  This setting is intended
        to transform a continuous timeseries into a weekday timeseries.
        If this value is True, then `use_continuous_calendar` is set
        to False (default is False).
    check_fields : list of str, optional {'high', 'low', 'open',
        'close', 'adj_close'}
        Fields to check for for NaN values.  If a NaN value is found
        for one of these fields, that row is dropped
        (default is ['close']).

        The timeseries for specified start:end, optionally with prices

    You should only set one of `use_continuous_calendar`=True or
    `force_stock_market_calendar`=True for a continuous timeseries.
    You should set neither of these to True if your timeseries is based
    on the stock market.
    columns = ['high', 'low', 'open', 'close']
    if use_adj:

    # Replace 0 value columns with NaN.
    ts[columns] = ts[ts[columns] > 0][columns]

    if use_continuous_calendar:
        force_stock_market_calendar = False
    if force_stock_market_calendar:
        use_continuous_calendar = False

    if use_continuous_calendar:

    if force_stock_market_calendar:
        index = pd.to_datetime(stock_market_calendar)
        ts = ts.reindex(index=index)

    ts.dropna(subset=check_fields, inplace=True)

    if use_adj:

    if start < ts.index[0]:
        start = ts.index[0]
    if end > ts.index[-1]:
        end = ts.index[-1]
    ts = ts[start - datetime.timedelta(365):end]

    return ts

def finalize_timeseries(ts, start, dropna=False, drop_columns=None):
    Finalize timeseries.

    Drop all rows that have nan column values.  Set timeseries to begin
    at start.

    ts : pd.DataFrame
        The timeseries of a symbol.
    start : datetime.datetime
        The start date for backtest.
    dropna : bool, optional
        Drop rows that have a NaN value in one of it's columns
        (default is False).
    drop_columns : list of str, optional
        List of columns to drop from `ts` (default is None, which
        implies that no columns should be dropped).

        The start date.
        The timeseries of a symbol.
    if drop_columns:
        ts.drop(columns=drop_columns, inplace=True)
    if dropna:
    elif ts.isnull().values.any():
        warnings.warn("NaN value(s) detected in timeseries")
    ts = ts[start:]
    start = ts.index[0]
    return ts, start

# CACHE SYMBOLS (remove, update, get_symbol_metadata)

def _difference_in_years(start, end):
    Calculate the number of years between two dates.

    start : datetime.datetime
        The start date.
    end : datetime.datetime
        The end date.

        The difference in years between start and end dates.
    diff = end - start
    diff_in_years = (diff.days + diff.seconds/86400)/365.2425
    return diff_in_years

def remove_cache_symbols(symbols=None, dir_name='data'):
    Remove cached timeseries for list of symbols.

    Filter out any symbols prefixed with '__'.

    symbols : str or list of str, optional
        The symbol(s) for which to remove cached timeseries (default
        is None, which imples remove timeseries for all symbols).
    dir_name : str, optional
        The leaf data dir name (default is 'data').

    cache_dir = _get_cache_dir(dir_name)

    if symbols:
        # If symbols is not a list, cast it to a list.
        if not isinstance(symbols, list):
            symbols = [symbols]
        filenames = [symbol.upper() + '.csv' for symbol in symbols]
        filenames = [f for f in os.listdir(cache_dir) if f.endswith('.csv')]

    # Filter out any filename prefixed with '__'.
    filenames = [f for f in filenames if not f.startswith('__')]

    print('removing symbols:')
    for i, f in enumerate(filenames):
        symbol = os.path.splitext(f)[0]
        print(symbol + ' ', end='')
        if i % 10 == 0 and i != 0: print()

        filepath = os.path.join(cache_dir, f)
        if os.path.exists(filepath):
            print(f'\n({f} not found)')

def update_cache_symbols(symbols=None, dir_name='data', from_year=None):
    Update cached timeseries for list of symbols.

    Filter out any filename prefixed with '__'.

    symbols : str or list, optional
        The symbol(s) for which to update cached timeseries (default
        is None, which imples update timeseries for all symbols).
    dir_name : str, optional
        The leaf data dir name (default is 'data).
    from_year: int, optional
        The start year for timeseries retrieval (default is None,
        which implies that all the available data is retrieved).

    cache_dir = _get_cache_dir(dir_name)

    if symbols:
        # If symbols is not a list, cast it to a list.
        if not isinstance(symbols, list):
            symbols = [symbols]
        filenames = ([f for f in os.listdir(cache_dir)
                      if f.endswith('.csv') and not f.startswith('__')])
        symbols = [os.path.splitext(filename)[0] for filename in filenames]

    # Make symbol names uppercase.
    symbols = [symbol.upper() for symbol in symbols]

    print('updating symbols:')
    for i, symbol in enumerate(symbols):
        print(symbol + ' ', end='')
        if i % 10 == 0 and i != 0:

            fetch_timeseries(symbol, dir_name=dir_name, use_cache=False,
        except RemoteDataError as e:
        except Exception as e:

def get_symbol_metadata(symbols=None, dir_name='data', from_year=None):
    Get symbol metadata for list of symbols.

    Filter out any filename prefixed with '__'.

    symbols : str or list, optional
        The symbol(s) for which to get symbol metadata (default
        is None, which imples get symbol metadata for all symbols).
    dir_name : str, optional
        The leaf data dir name (default is 'data).
    from_year: int, optional
        The start year for timeseries retrieval (default is None,
        which implies that all the available data is retrieved).

        Each row contains metadata for a symbol.
    cache_dir = _get_cache_dir(dir_name)

    if symbols:
        # If symbols is not a list, cast it to a list.
        if not isinstance(symbols, list):
            symbols = [symbols]
        filenames = ([f for f in os.listdir(cache_dir)
                     if f.endswith('.csv') and not f.startswith('__')])
        symbols = [os.path.splitext(filename)[0] for filename in filenames]

    # Make symbol names uppercase.
    symbols = [symbol.upper() for symbol in symbols]

    l = []
    for i, symbol in enumerate(symbols):
            ts = fetch_timeseries(symbol, dir_name=dir_name, use_cache=True,
            start = ts.index[0].to_pydatetime()
            end = ts.index[-1].to_pydatetime()
            num_years = _difference_in_years(start, end)
            start = start.strftime('%Y-%m-%d')
            end = end.strftime('%Y-%m-%d')
            t = (symbol, start, end, num_years)
        except RemoteDataError as e:
        except Exception as e:
    columns = ['symbol', 'start_date', 'end_date', 'num_years']
    df = pd.DataFrame(l, columns=columns)
    return df


def fetch_timeseries(symbol, dir_name='data', use_cache=True, from_year=None)

Read time series data.

Use cached version if it exists and use_cache is True, otherwise retrive, cache, then read.


symbol : str
The symbol for a security.
dir_name : str, optional
The leaf data dir name (default is 'data').
use_cache : bool, optional
True to use data cache. False to retrieve from the internet (default is True).
from_year : int, optional
The start year for timeseries retrieval (default is None, which implies that all the available data is retrieved).


The timeseries of a symbol.
Expand source code
def fetch_timeseries(symbol, dir_name='data', use_cache=True, from_year=None):
    Read time series data.

    Use cached version if it exists and use_cache is True, otherwise
    retrive, cache, then read.

    symbol : str
        The symbol for a security.
    dir_name : str, optional
        The leaf data dir name (default is 'data').
    use_cache: bool, optional
        True to use data cache.  False to retrieve from the internet 
        (default is True).
    from_year: int, optional
        The start year for timeseries retrieval (default is None,
        which implies that all the available data is retrieved).

        The timeseries of a symbol.
    if from_year is None:
        from_year = 1900 if not sys.platform.startswith('win') else 1971

    # Yahoo finance uses '-' where '.' is used in symbol names.
    symbol = symbol.replace('.', '-')
    symbol = symbol.upper()

    # pinkfish allows the use of a suffix starting with a '_',
    # like SPY_SHRT, so extract the symbol.
    symbol = symbol.split('_')[0]

    timeseries_cache = os.path.join(_get_cache_dir(dir_name), symbol + '.csv')

    if os.path.isfile(timeseries_cache) and use_cache:
            ts = pdr.get_data_yahoo(symbol, start=datetime.datetime(from_year, 1, 1), progress=False)
        except RemoteDataError as e:
        except Exception as e:
            ts.to_csv(timeseries_cache, encoding='utf-8')

    ts = pd.read_csv(timeseries_cache, index_col='Date', parse_dates=True)
    ts = _adj_column_names(ts)

    # Remove rows that have duplicated index.
    ts = ts[~ts.index.duplicated(keep='first')]
    return ts
def finalize_timeseries(ts, start, dropna=False, drop_columns=None)

Finalize timeseries.

Drop all rows that have nan column values. Set timeseries to begin at start.


ts : pd.DataFrame
The timeseries of a symbol.
start : datetime.datetime
The start date for backtest.
dropna : bool, optional
Drop rows that have a NaN value in one of it's columns (default is False).
drop_columns : list of str, optional
List of columns to drop from ts (default is None, which implies that no columns should be dropped).


The start date.
The timeseries of a symbol.
Expand source code
def finalize_timeseries(ts, start, dropna=False, drop_columns=None):
    Finalize timeseries.

    Drop all rows that have nan column values.  Set timeseries to begin
    at start.

    ts : pd.DataFrame
        The timeseries of a symbol.
    start : datetime.datetime
        The start date for backtest.
    dropna : bool, optional
        Drop rows that have a NaN value in one of it's columns
        (default is False).
    drop_columns : list of str, optional
        List of columns to drop from `ts` (default is None, which
        implies that no columns should be dropped).

        The start date.
        The timeseries of a symbol.
    if drop_columns:
        ts.drop(columns=drop_columns, inplace=True)
    if dropna:
    elif ts.isnull().values.any():
        warnings.warn("NaN value(s) detected in timeseries")
    ts = ts[start:]
    start = ts.index[0]
    return ts, start
def get_symbol_metadata(symbols=None, dir_name='data', from_year=None)

Get symbol metadata for list of symbols.

Filter out any filename prefixed with '__'.


symbols : str or list, optional
The symbol(s) for which to get symbol metadata (default is None, which imples get symbol metadata for all symbols).
dir_name : str, optional
The leaf data dir name (default is 'data).
from_year : int, optional
The start year for timeseries retrieval (default is None, which implies that all the available data is retrieved).


Each row contains metadata for a symbol.
Expand source code
def get_symbol_metadata(symbols=None, dir_name='data', from_year=None):
    Get symbol metadata for list of symbols.

    Filter out any filename prefixed with '__'.

    symbols : str or list, optional
        The symbol(s) for which to get symbol metadata (default
        is None, which imples get symbol metadata for all symbols).
    dir_name : str, optional
        The leaf data dir name (default is 'data).
    from_year: int, optional
        The start year for timeseries retrieval (default is None,
        which implies that all the available data is retrieved).

        Each row contains metadata for a symbol.
    cache_dir = _get_cache_dir(dir_name)

    if symbols:
        # If symbols is not a list, cast it to a list.
        if not isinstance(symbols, list):
            symbols = [symbols]
        filenames = ([f for f in os.listdir(cache_dir)
                     if f.endswith('.csv') and not f.startswith('__')])
        symbols = [os.path.splitext(filename)[0] for filename in filenames]

    # Make symbol names uppercase.
    symbols = [symbol.upper() for symbol in symbols]

    l = []
    for i, symbol in enumerate(symbols):
            ts = fetch_timeseries(symbol, dir_name=dir_name, use_cache=True,
            start = ts.index[0].to_pydatetime()
            end = ts.index[-1].to_pydatetime()
            num_years = _difference_in_years(start, end)
            start = start.strftime('%Y-%m-%d')
            end = end.strftime('%Y-%m-%d')
            t = (symbol, start, end, num_years)
        except RemoteDataError as e:
        except Exception as e:
    columns = ['symbol', 'start_date', 'end_date', 'num_years']
    df = pd.DataFrame(l, columns=columns)
    return df
def remove_cache_symbols(symbols=None, dir_name='data')

Remove cached timeseries for list of symbols.

Filter out any symbols prefixed with '__'.


symbols : str or list of str, optional
The symbol(s) for which to remove cached timeseries (default is None, which imples remove timeseries for all symbols).
dir_name : str, optional
The leaf data dir name (default is 'data').


Expand source code
def remove_cache_symbols(symbols=None, dir_name='data'):
    Remove cached timeseries for list of symbols.

    Filter out any symbols prefixed with '__'.

    symbols : str or list of str, optional
        The symbol(s) for which to remove cached timeseries (default
        is None, which imples remove timeseries for all symbols).
    dir_name : str, optional
        The leaf data dir name (default is 'data').

    cache_dir = _get_cache_dir(dir_name)

    if symbols:
        # If symbols is not a list, cast it to a list.
        if not isinstance(symbols, list):
            symbols = [symbols]
        filenames = [symbol.upper() + '.csv' for symbol in symbols]
        filenames = [f for f in os.listdir(cache_dir) if f.endswith('.csv')]

    # Filter out any filename prefixed with '__'.
    filenames = [f for f in filenames if not f.startswith('__')]

    print('removing symbols:')
    for i, f in enumerate(filenames):
        symbol = os.path.splitext(f)[0]
        print(symbol + ' ', end='')
        if i % 10 == 0 and i != 0: print()

        filepath = os.path.join(cache_dir, f)
        if os.path.exists(filepath):
            print(f'\n({f} not found)')
def select_tradeperiod(ts, start, end, use_adj=False, use_continuous_calendar=False, force_stock_market_calendar=False, check_fields=['close'])

Select the trade period.

First, remove rows that have zero values in price columns. Then, select a time slice of the data to trade from ts. Back date a year to allow time for long term indicators, e.g. 200sma is become valid.


ts : pd.DataFrame
The timeseries of a symbol.
start : datetime.datetime
The desired start date for the strategy.
end : datetime.datetime
The desired end date for the strategy.
use_adj : bool, optional
True to adjust prices for dividends and splits (default is False).
use_continuous_calendar : bool, optional
True if your timeseries has data for all seven days a week, and you want to backtest trading every day, including weekends. If this value is True, then force_stock_market_calendar is set to False (default is False).
force_stock_market_calendar : bool, optional
True forces use of stock market calendar on timeseries. Normally, you don't need to do this. This setting is intended to transform a continuous timeseries into a weekday timeseries. If this value is True, then use_continuous_calendar is set to False (default is False).
check_fields : list of str, optional {'high', 'low', 'open',
'close', 'adj_close'} Fields to check for for NaN values. If a NaN value is found for one of these fields, that row is dropped (default is ['close']).


The timeseries for specified start:end, optionally with prices adjusted.


You should only set one of use_continuous_calendar=True or force_stock_market_calendar=True for a continuous timeseries. You should set neither of these to True if your timeseries is based on the stock market.

Expand source code
def select_tradeperiod(ts, start, end, use_adj=False,
    Select the trade period.

    First, remove rows that have zero values in price columns. Then,
    select a time slice of the data to trade from ts.  Back date a year
    to allow time for long term indicators, e.g. 200sma is become valid.

    ts : pd.DataFrame
        The timeseries of a symbol.
    start : datetime.datetime
        The desired start date for the strategy.
    end : datetime.datetime
        The desired end date for the strategy.
    use_adj : bool, optional
        True to adjust prices for dividends and splits
        (default is False).
    use_continuous_calendar: bool, optional
        True if your timeseries has data for all seven days a week,
        and you want to backtest trading every day, including weekends.
        If this value is True, then `force_stock_market_calendar`
        is set to False (default is False).
    force_stock_market_calendar : bool, optional
        True forces use of stock market calendar on timeseries.
        Normally, you don't need to do this.  This setting is intended
        to transform a continuous timeseries into a weekday timeseries.
        If this value is True, then `use_continuous_calendar` is set
        to False (default is False).
    check_fields : list of str, optional {'high', 'low', 'open',
        'close', 'adj_close'}
        Fields to check for for NaN values.  If a NaN value is found
        for one of these fields, that row is dropped
        (default is ['close']).

        The timeseries for specified start:end, optionally with prices

    You should only set one of `use_continuous_calendar`=True or
    `force_stock_market_calendar`=True for a continuous timeseries.
    You should set neither of these to True if your timeseries is based
    on the stock market.
    columns = ['high', 'low', 'open', 'close']
    if use_adj:

    # Replace 0 value columns with NaN.
    ts[columns] = ts[ts[columns] > 0][columns]

    if use_continuous_calendar:
        force_stock_market_calendar = False
    if force_stock_market_calendar:
        use_continuous_calendar = False

    if use_continuous_calendar:

    if force_stock_market_calendar:
        index = pd.to_datetime(stock_market_calendar)
        ts = ts.reindex(index=index)

    ts.dropna(subset=check_fields, inplace=True)

    if use_adj:

    if start < ts.index[0]:
        start = ts.index[0]
    if end > ts.index[-1]:
        end = ts.index[-1]
    ts = ts[start - datetime.timedelta(365):end]

    return ts
def update_cache_symbols(symbols=None, dir_name='data', from_year=None)

Update cached timeseries for list of symbols.

Filter out any filename prefixed with '__'.


symbols : str or list, optional
The symbol(s) for which to update cached timeseries (default is None, which imples update timeseries for all symbols).
dir_name : str, optional
The leaf data dir name (default is 'data).
from_year : int, optional
The start year for timeseries retrieval (default is None, which implies that all the available data is retrieved).


Expand source code
def update_cache_symbols(symbols=None, dir_name='data', from_year=None):
    Update cached timeseries for list of symbols.

    Filter out any filename prefixed with '__'.

    symbols : str or list, optional
        The symbol(s) for which to update cached timeseries (default
        is None, which imples update timeseries for all symbols).
    dir_name : str, optional
        The leaf data dir name (default is 'data).
    from_year: int, optional
        The start year for timeseries retrieval (default is None,
        which implies that all the available data is retrieved).

    cache_dir = _get_cache_dir(dir_name)

    if symbols:
        # If symbols is not a list, cast it to a list.
        if not isinstance(symbols, list):
            symbols = [symbols]
        filenames = ([f for f in os.listdir(cache_dir)
                      if f.endswith('.csv') and not f.startswith('__')])
        symbols = [os.path.splitext(filename)[0] for filename in filenames]

    # Make symbol names uppercase.
    symbols = [symbol.upper() for symbol in symbols]

    print('updating symbols:')
    for i, symbol in enumerate(symbols):
        print(symbol + ' ', end='')
        if i % 10 == 0 and i != 0:

            fetch_timeseries(symbol, dir_name=dir_name, use_cache=False,
        except RemoteDataError as e:
        except Exception as e: