0.6.0
The 0.6.0 release removes support for scala versions below 2.11, migrates to scalaz-iteratees and adds support for 2.13's new collection approach.
Build wise it's moved to maven and github actions to simplify rolling out any future fixes, similarly the docs have migrated from the home-brew site builder to mkdocs. The feature scope is:
Scalaz Iteratee¶
The pull api (and async pull api) have changed to be fully monadic and will require code changes, the default Id implementation is not (always) tail recursive and leads to SOEs - as such the iteratees are no longer imported by utils._ directly you must choose the right import for your use case. The PullTest.testFoldOnDoneId for example will fail on 2.11 and 2.12, but sometimes works on 2.13 when running all tests (despite having the same scalaz version as 2.12), so ymmv across runtimes.
Iteratee[E,A] is a type alias for IterateeT[E,Id,A] so everything is "wrapped" in an Id container (Id[X]=X), as such you may need to specify an appropriate Monad type to get a proper compilation and runtime behaviour, e.g. IO or Trampoline. Indeed, if the compiler doesn't present an error it's typically derived as Id, which can SOE.
To aid in this, and type-inference given a specific monad, the following helper function and defaults are provided:
def iterateesOf[F[_]]: IterateeFunctions[F]
implicit val ioIteratees = iterateesOf[IO]
implicit val trampolineIteratees = iterateesOf[Trampoline]
// not recommended but may help migrations
implicit val idIteratees = iterateesOf[Id]
// similarly
def pullIterateesOf[F[_]]: PullIterateeFunctions[F]
implicit val ioPullIteratees = pullIterateesOf[IO]
implicit val trampolinePullIteratees = pullIterateesOf[Trampoline]
// not recommended but may help migrations
implicit val idPullIteratees = pullIterateesOf[Id]
The functions in IterateeFunctions already have F captured with the type TheF (although F is not stored as an implicit given conflicts with import Scalaz._). So using:
import scales.utils.trampolineIteratees._
import scales.xml.trampolinePullIteratees._
import scales.utils._
import scales.xml._
will bring in the iteratee functions already defined within Trampoline, allowing better type inference (you may need to have the imports at different scopes to properly shadow). Alternatively, as the values are implicits you can implicitly bind (context or directly) against the monad you are interested in.
If your code can benefit from either specifying the monad at use site or type inference you can import the base functions with F[_] type parameters via:
import scales.utils.iteratee.functions._
Diverging implicit expansion for type scalaz..
This is probably occurring as you have imported the functions twice for different monads or the underlying functions. In order to ensure the type derivation is correct for the implicits you are best defining them at the use site directly e.g.:
import scales.utils.trampolineIteratees._
val iter: IterateeT[PullType, TheF, Option[PullType]] =
for {
_ <- peek[PullType, TheF]
_ <- peek[PullType, TheF]
_ <- peek[PullType, TheF]
_ <- peek[PullType, TheF]
_ <- peek[PullType, TheF]
i <- evalWith((p: PullType) => {
p
})(implicitly[Applicative[TheF]])
j <- dropWhile((p: PullType) => {
p.fold(x => !x.isInstanceOf[Elem], y => false)
})(implicitly[Monad[TheF]])
} yield j
Direct usage of iteratee's largely involves swapping Cont and Done for cont and done and migrating enumeratee / run usage to:
(iteratee &= iteratorEnumerator(pull.it)) run
instead of
iteratee(pull.it) run
this is because enumerators are no longer implicitly bound, using the monad type directly can notably increase verbosity. Scalaz now provides a good selection of useful starting enumerators.
The toResumableIter implicit conversion has been removed as this is no longer deemed a correct way to convert. It is now available through an explicit .toResumableIter call:
import scales.utils.iteratee.functions.IterOps
val enum = (i: Int) => enumToMany(sum[Int].toResumableIter)(f(i))
foldI, foldIM and repeatUntil / repeatUntilM¶
foldI and foldIM have an additional stopOn parameter alongside the init starter parameter, by default both functions will, as before, stop on each done. This can be overridden to allow control of when to trigger done based on the ACC type.
This pattern is used to implement foldOnDone and a number of tests via a repeat EnumeratorT, as such it has a new function repeatUntil[M] which mimics a do while loop / repeat until loop. i.e. the following are equivalent:
repeatUntilM((initAcc, starter, false))(a => {
val (currentA, itr, _) = a
for {
step <- itr.value
isdone = isDoneS(step)
iseof = isEOFS(step)
shouldStop = (currentA, itr, true)
res =
if (isdone && !iseof) {
val a = extractS(step)
if (a.isEmpty)
shouldStop
else
(f(currentA, a.get), extractContS(step), false)
} else
shouldStop
} yield {
val (currentA, itr, done) = res
(currentA, (itr &= e).eval, done)
}
})(stopOn = a => a._3)
and
(foldIM[ACC,F,(ACC, ResumableIter[E,F,A], Boolean)]((p, a) => {
val (currentA, itr, _) = a
for {
step <- itr.value
isdone = isDoneS(step)
iseof = isEOFS(step)
shouldStop = (currentA, itr, true)
res =
if (isdone && !iseof) {
val a = extractS(step)
if (a.isEmpty)
shouldStop
else
(f(currentA, a.get), extractContS(step), false)
} else
shouldStop
} yield {
val (currentA, itr, done) = res
(currentA, (itr &= e).eval, done)
}
})(init = (initAcc, starter, false), stopOn = a => a._3) &= repeat[ACC,F](initAcc) ) run
Upgrading to Monadic usage¶
Don't trust the types - monads need the "nesting"¶
Quotes are there because we aren't really nesting of course, it's the same monad, but converting seemingly simple code can cause weeks of headaches (from the AsyncPullTest.testRandomAmounts):
repeatUntilM((startingIteratee, false, 0)) {
triple =>
val (c, stop, count) = triple
if (!stop) {
F.bind(c.value) {
cstep =>
// feed more data
val newc = c &= dataChunkerEnumerator(wrapped)
// map the iteratee's new F[A] so we can access the step
F.map(newc.value) { newcStep =>
// is the step EOF?
val isEof = isEOFS(newcStep)
// return the next iteratee to process and count if we've "paused"
(newc, isEof, if (isDoneS(cstep)) count else count + 1)
}
}
} else
F.point(triple)
}(_._2)
This code is meant to stop when the there is no more data to process (the isEof check), however it will fail miserably and in very odd ways. Data is processed in a different order than expected, if at all.
The error here is in the test case not the underlying code and all too easy to occur:
F.map(newc.value) { newcStep =>
val isEof = isEOFS(newcStep)
(newc, isEof, if (isDoneS(cstep)) count else count + 1)
^^^^
}
The fold/repeat expects an iteratee, so passing the "same" iteratee we just mapped against should be fine, it's the same step right?
Wrong
We'd expect its effect to happen before the isEOFS call and be finished, but it's now effectively happening twice. Instead, it describes:
Wrap an effect recursively
What we actually want is to bind the actions together in order:
F.map(newc.value) { newcStep =>
val isEof = isEOFS(newcStep)
(newcStep.pointI, isEof, if (isDoneS(cstep)) count else count + 1)
}
This now describes:
Wrap an effect that returns a new effect to run wrapped in an iteratee
which is what we really expected to say. In summary, you should expect Id code (outside of SOEs) to be able to work in other containers - if it doesn't this is another area to check for.
A more complex example is found within enumToMany doneWith, the original code had:
// signal the end here to toMany, don't care about result
toMany.foldT(done= (a1, y1) => false,
cont = k => {
k(Eof[E]); false
})
returnThis
so purely a hidden side effect. Monad wise we need to bring the effect into the monad to run at the correct time:
// signal the end here to toMany, don't care about result, tested by testSimpleLoad and testRandomAmounts in AsyncPullTest
iterateeT(
toMany.foldT(done = (a1, y1) => returnThis.value,
cont = k => {
F.bind(k(Eof[E]).value) {
_ => returnThis.value
}
})
)
So although we are still returning returnThis it's being bound to the action of signalling Eof. When returnThis is evaluated any resource closing is also triggered.
Why so slow?¶
IterV would allow non-monadic usage with tail recursion, this is no longer always possible in Scalaz 7 IterateeT usage. A "great" example of this is the foldOnDone method implementation, the original IterV based code is fairly simple:
var currentI = initResumable(it).eval
var isdone = isDone(currentI)
var currentA = initAcc
while( !isdone || (isdone && !isEOF(currentI)) ) {
if (isdone) {
val a = extract(currentI)
if (!a.isDefined)
return currentA
else {
currentA = f(currentA, a.get)
currentI = extractCont(currentI)
}
}
currentI = currentI(it).eval
isdone = isDone(currentI)
}
currentA
i.e. keep extracting the next continuation and call the f fold function when you find a done. The code runs fast (400ms on the dev box) and in constant space, as you'd expect. An "obvious" translation to a monadic version is:
val starter = (initResumable &= e).eval
val r =
(foldIM[ACC,F,(ACC, ResumableIter[E,F,A], Boolean)]((p, a) => {
val (currentA, itr, _) = a
for {
isdone <- isDone(itr)
iseof <- isEOF(itr)
res <-
if (isdone && !iseof) {
val a = extract(itr)
F.map(a) { a =>
if (!a.isDefined)
(currentA, itr, true)
else
(f(currentA, a.get), extractCont(itr), false)
}
} else
F.point((currentA, itr, true))
} yield {
val (currentA, itr, done) = res
(currentA, (itr &= e).eval, done)
}
})(init = (initAcc, starter, false), stopOn = a => a._3) &= repeat[ACC,F](initAcc) ) run
F.map( r ) { r =>
val ((acc, nr, b), cont) = r
acc
}
We have to loop, so we use a fold with an early exit and drive it through a never ending enumerator (whose value "p" above is ignored), unlike foldI, foldIM expects an accumulator wrapped in the monad. As we are looping we cannot return and have to handle all exit criteria directly, so more "if" branches are introduced.
Finally, as we must stay in the monad there is lots of direct map/point and of course the hidden bind/flatmaps. This code runs very slowly - 20 seconds (on the dev box), we've gone from a non-monadic version at 400ms to 20s, why?
This SO post does an excellent job of explaining how seemingly innocuous and correct code can become a very large chain of O(N) calls under the hood. This version of "r" brings us back to the 400-450ms mark (still a bit slower but no SOEs and monadic):
val r =
(foldIM[ACC,F,(ACC, ResumableIter[E,F,A], Boolean)]((p, a) => {
val (currentA, itr, _) = a
for {
step <- itr.value
isdone = isDoneS(step)
iseof = isEOFS(step)
shouldStop = (currentA, itr, true)
res =
if (isdone && !iseof) {
val a = extractS(step)
if (a.isEmpty)
shouldStop
else
(f(currentA, a.get), extractContS(step), false)
} else
shouldStop
} yield {
val (currentA, itr, done) = res
(currentA, (itr &= e).eval, done)
}
})(init = (initAcc, starter, false), stopOn = a => a._3) &= repeat[ACC,F](initAcc) ) run
Here, instead of 2 flatmaps and 1 flatmap containing a map that is also mapped we are left with only one flatmap - that of the underlying step. To enable this there are now new functions ending in S designed to behave like the old IterV based function and stay out of monads. If your code runs slower than expected this is a likely culprit.
A note on Id, Trampoline and IO¶
Although the base Monad types have a consistent API actually running them is inconsistent (as can be using them in for loops - make sure to import scalaz.Scalaz._). Id doesn't actually get run, Trampoline/Free has a run function and IO has an unsafePerformIO.
This makes code a little fragile to simply swap out implementations despite the handy monad specific iteratee imports. As such Scales also adds the scales.utils.monadHelpers._ imports which provides wrappers for these behaviours, with run/unsafePerformIO becoming "runIt" (perform is taken by Scalaz for common imports).
Note: you'll have to use p.runIt:
import scales.utils.iteratee.monadHelpers._
import scalaz.Scalaz._
val p = doSomethingReturningAnFofA
val a: A = p runIt
If you are having issues on 2.11/2.12 if the type is Trampoline make sure the import of monadHelpers is "_" or explicitly imports TrampolinePerformer as well (2.13 finds it without issue under Performer).
In order to make re-usable code specify the correct implicits, including the correct monad iteratees:
def theResuableCode[F[_]: Monad: IterateeFunctions: CanRunIt](): F[SomeResult] = {
}
It is suggested that CanRunIt is only used in testing, rather prefer to bring the actual state change, of running the monad, to the edges of your code where you probably know which monad you are using.
BEWARE THE ITERATOR¶
As Trampolining is now used anything that generates with side effects e.g. Iterator[PullType] can cause significant oddities, as one part of the code you'd reasoned is finished starts up again and calls next.
In order to aid this there is a memory and state trade off to be had and wrapping the iterator in EphemeralStream. To aid this three functions are introduced:
package scales.utils.iteratee
object EphemeralStreamEnum {
def enumEphemeralStream[E, F[_] : Monad](xs: EphemeralStream[E]): EnumeratorT[E, F]
def enumEphemeralStreamF[E, F[_] : Monad](state: EphemeralStream[E] => Unit)(xs: EphemeralStream[E]): EnumeratorT[E, F]
def toEphemeral[A](iterator: Iterator[A]): EphemeralStream[A]
}
toEphemeral safely wraps an iterator in EphemeralStream ensuring trampolining does not trigger more .next calls.
The enumEphemeralStreamF variant lets you use a workaround to keep the progress and restart processing the stream:
var theStream: EphemeralStream[PullType] = toEphemeral(xmlpull: Iterator[PullType])
val func = (e: EphemeralStream[PullType]) => {iter = e}
def enum(e: EphemeralStream[PullType]) = enumEphemeralStreamF[PullType, TheF](func)(e)
val starter = (ionDone &= enum(theStream)).eval
// some time later, restart the stream processing
(extractCont(starter) &= enum(theStream)).eval
Consider iteratee composition
If you are using iteratee's already consider composing them via for and only processing the stream once (i.e. not using extractCont) if intermediate values aren't needed.
Scala 2.13 support¶
2.13 re-worked much of the internal collection logic, including CanBuildFrom. Scales required the ability to swap out the actual container used for Tree's in order to reduce allocation cost (yielding better performance).
As such there is a compatibility layer working around most of this unpleasantness, however for compatibility CanBuildfrom is still implicitly required in the apis.
None of this should affect 2.13 usage but please raise an issue should you find one.
Created: May 10, 2024 12:48:10