Source code for pyspread.lib.csv
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright Martin Manns
# Distributed under the terms of the GNU General Public License
# --------------------------------------------------------------------
# pyspread is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pyspread is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pyspread. If not, see <http://www.gnu.org/licenses/>.
# --------------------------------------------------------------------
"""
**Provides**
* :func:`sniff`: Sniffs CSV dialect and header info
* :func:`get_header`
* :func:`csv_reader`
* :func:`convert`
* :func:`date`
* :func:`datetime`
* :func:`time`
* :func:`make_object`
* :dict:`typehandlers`
"""
import ast
import csv
from decimal import Decimal
from pathlib import Path
from typing import TextIO, Iterable, List
try:
from dateutil.parser import parse
except ImportError:
parse = None
try:
from moneyed import Money, list_all_currencies
except ImportError:
Money = None
[docs]
def sniff(filepath: Path, sniff_size: int, encoding: str) -> csv.Dialect:
"""Sniffs CSV dialect and header info
:param filepath: Path of file to sniff
:param sniff_size: Maximum no. bytes to use for sniffing
:param encoding: File encoding
:return: csv.Dialect object with additional attribute `has_header`
"""
with open(filepath, newline='', encoding=encoding) as csvfile:
csv_str = csvfile.read(sniff_size)
dialect = csv.Sniffer().sniff(csv_str)
setattr(dialect, "hasheader", csv.Sniffer().has_header(csv_str))
setattr(dialect, "encoding", encoding)
if dialect.escapechar is None:
setattr(dialect, "quoting", csv.QUOTE_NONE)
setattr(dialect, "escapechar", '"')
return dialect
[docs]
def csv_reader(csvfile: TextIO, dialect: csv.Dialect) -> Iterable[str]:
"""Generator of str values from csv file in filepath, ignores header
:param csvfile: Csv file to read
:param dialect: Csv dialect
"""
csvreader = csv.reader(csvfile, dialect=dialect)
try:
ignore_header = dialect.hasheader and not dialect.keepheader
except AttributeError:
try:
ignore_header = dialect.hasheader
except AttributeError:
ignore_header = False
if ignore_header:
for line in csvreader:
break
for line in csvreader:
yield line
# Type conversion functions
[docs]
def convert(string: str, digest_type: str) -> str:
"""Main type conversion function for csv import
:param string: String to be digested
:param digest_type: Name of digetsion function
:return: Converted string
"""
if digest_type is None:
digest_type = 'repr'
if digest_type.split()[0] == "Money":
currency = digest_type.split()[1][1:-1]
return repr(typehandlers["Money"](string, currency=currency))
try:
return repr(typehandlers[digest_type](string))
except Exception:
return repr(string)
[docs]
def date(obj):
"""Makes a date from comparable types"""
try:
return parse(obj).date()
except TypeError as err:
return err
[docs]
def datetime(obj):
"""Makes a datetime from comparable types"""
try:
return parse(obj)
except TypeError as err:
return err
[docs]
def time(obj):
"""Makes a time from comparable types"""
try:
return parse(obj).time()
except TypeError as err:
return err
[docs]
def make_object(obj):
"""Parses the object with ast.literal_eval"""
return ast.literal_eval(obj)
typehandlers = {
'object': ast.literal_eval,
'repr': lambda x: x,
'str': str,
'int': int,
'float': float,
'decimal': Decimal,
'complex': complex,
'bool': bool,
'bytes': bytes,
}
if Money is not None:
def money(amount, currency="USD"):
"""Money wrapper defining a standard currency"""
return Money(amount, currency=currency)
typehandlers.update({'Money': money})
currencies = list_all_currencies()
else:
currencies = []
if parse is not None:
typehandlers.update({'date': date,
'datetime': datetime,
'time': time,
})