from copy import copy
from pathlib import Path
from pydantic import BaseModel
-from typing import Callable
+from typing import Callable, Iterable, Optional
from nom.utils import NomError
# TODO: What if there's a pipe in one of the fields?
def to_str(self, delimiter: str ='|'):
- return delimiter.join([v for v in self.__dict__.values()])
+ return delimiter.join([str(v) for v in self.__dict__.values()])
def to_dict(self):
return vars(self)
return cls(**dct)
-class NomList:
+class NomList(set):
- def __init__(self, items=set(), delimiter: str="|"):
- self.delimiter=delimiter
- self.items : set[NomListItem] = items
+ def __init__(self, elements: Optional[Iterable[NomListItem]]=None):
+ if not elements:
+ super().__init__()
+ else:
+ super().__init__(elements)
def __add__(self, other):
- dct = copy(vars(self))
- dct['items'] = self.items.union(other.items)
- return self.__class__(**dct)
+ return self.__class__(self.union(other))
- def __contains__(self, value):
- return value in self.items
-
- def __eq__(self, other):
- return self.items == other.items
-
- def __iter__(self):
- return self.items.__iter__()
-
- def __len__(self):
- return len(self.items)
-
- def merge(self, other):
- self.items.update(other.items)
+ def select(self, predicate: Predicate):
+ items = {item for item in self if predicate(item)}
+ return self.__class__(items)
def to_stdout(self):
for item in self.items:
for row in reader:
item = nlitem.from_dict(row)
items.append(item)
- return cls(items=set(items), delimiter=delimiter)
+ return cls(items)
- def to_csv(self, file: Path):
- if not self.items:
+ def to_csv(self, file: Path, delimiter="|"):
+ if not self:
raise NomError("There are no entries to write.")
- fieldnames=next(iter(self.items)).get_fieldnames()
+ fieldnames=next(iter(self)).get_fieldnames()
dialect = excel_tab
- dialect.delimiter=self.delimiter
+ dialect.delimiter=delimiter
with open(file, "w") as f:
writer = DictWriter(f, fieldnames=fieldnames, dialect=dialect)
writer.writeheader()
- for item in self.items:
+ for item in self:
writer.writerow(item.to_dict())
def to_stdout(self):
- if not self.items:
+ if not self:
raise NomError("There are no entries to write.")
- for item in self.items:
+ for item in self:
print(item.to_str())
-
-def filter(nlist: NomList, predicate: Predicate):
- items = {item for item in nlist.items if predicate(item)}
- return nlist.__class__(items, delimiter=nlist.delimiter)
\ No newline at end of file
+
\ No newline at end of file
from pathlib import Path
+import sys
from nom.utils import url2filename, NomError
from nom.feed import Feed, FeedList
FEED_LIST=Path.home() / ".local" / "share" / "nom" / "feedlist" / "default"
# TODO: Flesh out CLI.
-def main():
+def main(args=['nom'].append(sys.argv)):
parser = cli()
- args = parser.parse_args()
+ args = parser.parse_args(args=args)
# Direct Logic
feedlist=FeedList.from_csv(FEED_LIST)
if args.command == "entry" and args.entry_command == "show":
elist=EntryList()
for flitem in feedlist:
- elist.merge(flitem.to_feed().to_entrylist())
+ elist += flitem.to_feed().to_entrylist()
elist.to_stdout()
elif args.command == "feed" and args.feed_command == "update":
feedlist.fetch_feeds(FEED_CACHE)
import pytest
from nom.entry import EntryList, EntryListItem
-from nom.filter import is_viewed
from test_feed import feedlist
@pytest.fixture
def elist_item(elist_single):
- return next(iter(elist_single.items))
+ return next(iter(elist_single))
def test_elist_constructors(elist_single):
remade = elist_item.from_dict(elist_item.to_dict())
assert remade == elist_item
-def test_elist_merge(elist_multi, elist_single):
- original_length = len(elist_multi)
- elist_multi.merge(elist_single)
- assert len(elist_multi) == original_length + 1
-
def test_elist_addition(elist_multi, elist_single):
sum_ = elist_multi + elist_single
assert len(sum_) == len(elist_multi) + len(elist_single)
assert isinstance(sum_,EntryList)
-def test_elist_filter(elist_multi):
- #viewed = filter(is_viewed, elist_multi)
- from nom.base import filter
- viewed=filter(elist_multi, lambda e: e.viewed)
+def test_elist_select(elist_multi):
+ viewed = elist_multi.select(lambda e: e.viewed)
assert len(viewed) < len(elist_multi)